This commit is contained in:
wanglihui
2022-12-06 19:10:28 +08:00
21 changed files with 1561 additions and 31 deletions

View File

@@ -9,6 +9,11 @@ import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
*/
public class CommonConfig {
/**
* 定位库默认分隔符
*/
public static final String LOCATION_SEPARATOR = ".";
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
@@ -69,6 +74,24 @@ public class CommonConfig {
public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag");
public static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
public static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
public static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
public static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
public static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
public static final int NACOS_READ_TIMEOUT = CommonConfigurations.getIntProperty("nacos.read.timeout");
public static final String HOS_TOKEN = CommonConfigurations.getStringProperty("hos.token");
public static final String CLUSTER_OR_SINGLE = CommonConfigurations.getStringProperty("cluster.or.single");
public static final String HDFS_URI_NS1 = CommonConfigurations.getStringProperty("hdfs.uri.nn1");
public static final String HDFS_URI_NS2 = CommonConfigurations.getStringProperty("hdfs.uri.nn2");
public static final String HDFS_PATH = CommonConfigurations.getStringProperty("hdfs.path");
public static final String HDFS_USER = CommonConfigurations.getStringProperty("hdfs.user");
public static final String DOWNLOAD_PATH = CommonConfigurations.getStringProperty("download.path");
public static void main(String[] args) {
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
// 配置加密解密的密码/salt值

View File

@@ -0,0 +1,26 @@
package com.zdjizhi.common;
import java.io.Serializable;
public class CustomFile implements Serializable {
String fileName;
byte[] content;
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}

View File

@@ -0,0 +1,91 @@
package com.zdjizhi.common;
public class KnowledgeLog {
public String id;
public String name;
public String path;
public Long size;
public String format;
public String sha256;
public String version;
public String updateTime;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public Long getSize() {
return size;
}
public void setSize(Long size) {
this.size = size;
}
public String getFormat() {
return format;
}
public void setFormat(String format) {
this.format = format;
}
public String getSha256() {
return sha256;
}
public void setSha256(String sha256) {
this.sha256 = sha256;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getUpdateTime() {
return updateTime;
}
public void setUpdateTime(String updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "KnowledgeLog{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", path='" + path + '\'' +
", size=" + size +
", format='" + format + '\'' +
", sha256='" + sha256 + '\'' +
", version='" + version + '\'' +
", updateTime='" + updateTime + '\'' +
'}';
}
}

View File

@@ -1,30 +1,38 @@
package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.function.BroadcastProcessFunc;
import com.zdjizhi.source.DosSketchSource;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.*;
/**
* @author wlh
*/
public class ParseSketchLog {
// private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static final Log logger = LogFactory.get();
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
@@ -35,7 +43,28 @@ public class ParseSketchLog {
}
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog());
DataStreamSource<Map<String, byte[]>> broadcastSource=null;
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR);
nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME);
nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD);
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){
broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH);
}else {
broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties);
}
MapStateDescriptor<String,Map> descriptor =
new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class));
BroadcastStream<Map<String, byte[]>> broadcast = broadcastSource.broadcast(descriptor);
// BroadcastConnectedStream<String, List<CustomFile>> connect = DosSketchSource.createDosSketchSource().connect(broadcast);
return DosSketchSource.createDosSketchSource()
.connect(broadcast).process(new BroadcastProcessFunc());
// .flatMap(new FlatSketchLog());
}
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
@@ -53,14 +82,12 @@ public class ParseSketchLog {
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setSketch_start_time(sketchStartTime);
dosSketchLog.setSketch_duration(sketchDuration);
dosSketchLog.setAttack_type(attackType);
dosSketchLog.setVsys_id(vsysId);
String sourceIp = obj.get("source_ip").toString();
String destinationIp = obj.get("destination_ip").toString();
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());

View File

@@ -16,6 +16,10 @@ class TrafficServerIpMetricsSink {
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);
}
}

View File

@@ -1,12 +1,15 @@
package com.zdjizhi.source;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
@@ -31,4 +34,13 @@ public class DosSketchSource {
new SimpleStringSchema(), properties))
.setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM);
}
public static DataStreamSource<Map<String, byte[]>> broadcastSource(Properties nacosProperties, String STORE_PATH){
return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT,STORE_PATH));
}
public static DataStreamSource<Map<String, byte[]>> singleBroadcastSource(Properties nacosProperties){
return streamExeEnv.addSource(new SingleHttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT));
}
}

View File

@@ -0,0 +1,182 @@
package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.KnowledgeLog;
import com.zdjizhi.utils.*;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
//连接nacos的配置
private Properties nacosProperties;
//nacos data id
private String NACOS_DATA_ID;
//nacos group
private String NACOS_GROUP;
//nacos 连接超时时间
private long NACOS_READ_TIMEOUT;
//上传到hdfs的路径
private String STORE_PATH;
private ConfigService configService;
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
private static Map<String, String> updateMap = new HashMap<>();
private static HashMap<String, byte[]> knowledgeFileCache;
private boolean isRunning = true;
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
this.NACOS_GROUP = NACOS_GROUP;
this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT;
this.STORE_PATH = storePath;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//初始化元数据缓存
updateMap = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = formatter.format(new Date());
logger.info(format + "receive config from nacos:" + config);
System.out.println(format + "receive config from nacos:" + config);
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
}
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
try {
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
if (!sha256.equals(updateMap.get(fileName))) {
updateMap.put(fileName, sha256);
updateKnowledge(fileName, filePath);
}
}
ctx.collect(knowledgeFileCache);
}
}
} catch (Exception e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
while (isRunning) {
Thread.sleep(10000);
}
}
private void loadKnowledge(ArrayList<Object> metaList) {
InputStream inputStream = null;
try {
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
updateMap.put(fileName, sha256);
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateKnowledge(String fileName, String filePath) {
InputStream inputStream = null;
FileOutputStream outputStream = null;
try {
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
byte[] bytes = IOUtils.toByteArray(inputStream);
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes);
knowledgeFileCache.put(fileName, bytes);
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}

View File

@@ -0,0 +1,6 @@
package com.zdjizhi.source;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public interface HttpSourceFunction<OUT> extends SourceFunction<OUT> {
}

View File

@@ -0,0 +1,10 @@
package com.zdjizhi.source;
import org.apache.flink.api.common.functions.AbstractRichFunction;
public abstract class RichHttpSourceFunction<OUT> extends AbstractRichFunction implements HttpSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
public RichHttpSourceFunction() {
}
}

View File

@@ -0,0 +1,217 @@
package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.KnowledgeLog;
import com.zdjizhi.utils.*;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.*;
import java.util.concurrent.Executor;
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
private static HashMap<String, byte[]> knowledgeFileCache;
private Properties nacosProperties;
private String NACOS_DATA_ID;
private String NACOS_GROUP;
private long NACOS_READ_TIMEOUT;
private static String STORE_PATH;
private ConfigService configService;
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> updateMap = new HashMap<>();
private boolean isRunning = true;
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
this.NACOS_GROUP = NACOS_GROUP;
this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
//初始化元数据缓存
updateMap = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
// List<CustomFile> customFiles = new ArrayList<>();
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
}
// if (StringUtil.isNotBlank(config)) {
// List<KnowledgeLog> knowledgeLogListList = jsonMapperInstance.fromJson(config, listType);
// if (knowledgeLogListList.size()>=1){
// for (KnowledgeLog knowledgeLog : knowledgeLogListList) {
// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat());
// String sha256 = knowledgeLog.getSha256();
// updateMap.put(name,sha256);
// }
// }
// }
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
try {
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
if (!sha256.equals(updateMap.get(fileName))) {
updateMap.put(fileName, sha256);
updateKnowledge(fileName, filePath);
}
}
ctx.collect(knowledgeFileCache);
}
}
} catch (Exception e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
while (isRunning) {
Thread.sleep(10000);
}
}
// private CustomFile loadKnowledge(String fileName, String filePath) {
// InputStream inputStream = null;
// FileOutputStream outputStream = null;
// CustomFile customFile = new CustomFile();
// try {
// customFile.setFileName(fileName);
// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
// HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
// outputStream = new FileOutputStream(file);
// byte[] bytes = IOUtils.toByteArray(inputStream);
// customFile.setContent(bytes);
// inputStream = new ByteArrayInputStream(customFile.getContent());
// IoUtil.copy(inputStream, outputStream);
//
// } catch (IOException ioException) {
// ioException.printStackTrace();
// } finally {
// IOUtils.closeQuietly(inputStream);
// IOUtils.closeQuietly(outputStream);
// }
// return customFile;
// }
private void loadKnowledge(ArrayList<Object> metaList) {
InputStream inputStream = null;
try {
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
updateMap.put(fileName, sha256);
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateKnowledge(String fileName, String filePath) {
InputStream inputStream = null;
FileOutputStream outputStream = null;
try {
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
outputStream = new FileOutputStream(file);
byte[] bytes = IOUtils.toByteArray(inputStream);
knowledgeFileCache.put(fileName, bytes);
inputStream=new ByteArrayInputStream(bytes);
IoUtil.copy(inputStream, outputStream);
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}

View File

@@ -0,0 +1,24 @@
package com.zdjizhi.utils;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class FileByteUtils {
public static byte[] getFileBytes (String filePath) throws IOException {
File file = new File(filePath);
FileInputStream fis = new FileInputStream(file);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
byte[] b = new byte[1024];
int n;
while ((n = fis.read(b)) != -1) {
bos.write(b, 0, n);
}
fis.close();
byte[] data = bos.toByteArray();
bos.close();
return data;
}
}

View File

@@ -0,0 +1,75 @@
package com.zdjizhi.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.CommonConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HdfsUtils {
private static final Log logger = LogFactory.get();
private static FileSystem fileSystem;
static {
Configuration configuration = new Configuration();
try {
//配置hdfs相关信息
configuration.set("fs.defaultFS","hdfs://ns1");
configuration.set("hadoop.proxyuser.root.hosts","*");
configuration.set("hadoop.proxyuser.root.groups","*");
configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
configuration.set("dfs.nameservices","ns1");
configuration.set("dfs.ha.namenodes.ns1","nn1,nn2");
configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1);
configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2);
configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
//指定用户
System.setProperty("HADOOP_USER_NAME", CommonConfig.HDFS_USER);
//创建fileSystem,用于连接hdfs
fileSystem = FileSystem.get(configuration);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static boolean isExists(String filePath) throws IOException {
return fileSystem.exists(new Path(filePath));
}
public static byte[] getFileBytes(String filePath) throws IOException {
try (FSDataInputStream open = fileSystem.open(new Path(filePath))) {
byte[] bytes = new byte[open.available()];
open.read(0, bytes, 0, open.available());
return bytes;
} catch (IOException e) {
logger.error("An I/O exception when files are download from HDFS. Message is :" + e.getMessage());
}
return null;
}
public static void uploadFileByBytes(String filePath,byte[] bytes) throws IOException {
try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) {
fsDataOutputStream.write(bytes);
fsDataOutputStream.flush();
} catch (RuntimeException e) {
logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage());
} catch (IOException e) {
logger.error("An I/O exception when files are uploaded to HDFS. Message is :" + e.getMessage());
}
}
public static void rename(String src, String dst) throws IOException {
fileSystem.rename(new Path(src),new Path(dst));
}
}

View File

@@ -0,0 +1,234 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.commons.io.IOUtils;
import org.apache.http.*;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.Map;
import static org.apache.kafka.common.requests.FetchMetadata.log;
/**
* http client工具类
*/
public class HttpClientUtils2 {
/** 全局连接池对象 */
private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils2.class);
public static final String ERROR_MESSAGE = "-1";
/*
* 静态代码块配置连接池信息
*/
static {
// 设置最大连接数
CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
// 设置每个连接的路由数
CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
}
/**
* 在调用SSL之前需要重写验证方法取消检测SSL
* 创建ConnectionManager添加Connection配置信息
*
* @return HttpClient 支持https
*/
private PoolingHttpClientConnectionManager getSslClientManager() {
try {
// 在调用SSL之前需要重写验证方法取消检测SSL
X509TrustManager trustManager = new X509TrustManager() {
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
@Override
public void checkClientTrusted(X509Certificate[] xcs, String str) {
}
@Override
public void checkServerTrusted(X509Certificate[] xcs, String str) {
}
};
SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
ctx.init(null, new TrustManager[]{trustManager}, null);
SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", socketFactory).build();
// 创建ConnectionManager添加Connection配置信息
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
// 设置最大连接数
connManager.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
// 设置每个连接的路由数
connManager.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
return connManager;
} catch (KeyManagementException | NoSuchAlgorithmException e) {
throw new RuntimeException(e.getMessage());
}
}
/**
* 获取Http客户端连接对象
* @return Http客户端连接对象
*/
private CloseableHttpClient getHttpClient() {
// 创建Http请求配置参数
RequestConfig requestConfig = RequestConfig.custom()
// 获取连接超时时间
.setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT)
// 请求超时时间
.setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT)
// 响应超时时间
.setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)
.build();
/*
* 测出超时重试机制为了防止超时不生效而设置
* 如果直接放回false,不重试
* 这里会根据情况进行判断是否重试
*/
HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
if (executionCount >= 3) {// 如果已经重试了3次就放弃
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
return true;
}
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
return false;
}
if (exception instanceof UnknownHostException) {// 目标服务器不可达
return false;
}
if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
return false;
}
if (exception instanceof HttpHostConnectException) {// 连接被拒绝
return false;
}
if (exception instanceof SSLException) {// ssl握手异常
return false;
}
if (exception instanceof InterruptedIOException) {// 超时
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
// 如果请求是幂等的,就再次尝试
return !(request instanceof HttpEntityEnclosingRequest);
};
ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && "timeout".equalsIgnoreCase(param)) {
return Long.parseLong(value) * 1000;
}
}
return 60 * 1000;//如果没有约定则默认定义时长为60s
};
// 创建httpClient
return HttpClients.custom()
// 把请求相关的超时信息设置到连接客户端
.setDefaultRequestConfig(requestConfig)
// 把请求重试设置到连接客户端
.setRetryHandler(retry)
.setKeepAliveStrategy(myStrategy)
// 配置连接池管理对象
.setConnectionManager(getSslClientManager())
.build();
}
// TODO: 2022/10/19 加载知识库
public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) {
InputStream result = null;
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient();// TODO: 2022/10/19 去掉了 socketTimeout
// 创建GET请求对象
HttpGet httpGet = new HttpGet(url);
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
}
}
CloseableHttpResponse response = null;
try {
// 执行请求
response = httpClient.execute(httpGet);
// 获取响应实体
result = IOUtils.toBufferedInputStream(response.getEntity().getContent());
// 获取响应信息
EntityUtils.consume(response.getEntity());
} catch (ClientProtocolException e) {
log.error("current file: {},Protocol error:{}", url, e.getMessage());
} catch (ParseException e) {
log.error("current file: {}, Parser error:{}", url, e.getMessage());
} catch (IOException e) {
log.error("current file: {},IO error:{}", url, e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consume(response.getEntity());
response.close();
} catch (IOException e) {
log.error("Release Connection error:{}", e.getMessage());
}
}
return result;
}
}
}

View File

@@ -0,0 +1,21 @@
package com.zdjizhi.utils;
import lombok.Data;
/**
* @author fy
* @version 1.0
* @date 2022/10/19 18:27
*/
@Data
public class IpLocationConfiguration {
private String ipV4UserDefined;
private String ipV4BuiltIn;
private String ipV6UserDefined;
private String ipV6BuiltIn;
}

View File

@@ -1,18 +1,104 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
public class IpUtils {
public static IpLookupV2 ipLookup ;
private static Logger LOG = LoggerFactory.getLogger(IpUtils.class);
/**
* IP定位库工具类
*/
public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb")
.loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb")
.build();
// public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
// .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb")
// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb")
// .loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb")
// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb")
// .build();
public static void loadIpLook(){
try {
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)) {
byte[] ipv4BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_built_in.mmdb");
if (ipv4BuiltBytes!=null){
InputStream ipv4BuiltInputStream = new ByteArrayInputStream(ipv4BuiltBytes);
builder.loadDataFileV4(ipv4BuiltInputStream);
}
byte[] ipv6BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_built_in.mmdb");
if (ipv6BuiltBytes!=null){
InputStream ipv6BuiltInputStream = new ByteArrayInputStream(ipv6BuiltBytes);
builder.loadDataFileV6(ipv6BuiltInputStream);
}
byte[] ipv4UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_user_defined.mmdb");
if (ipv4UserBytes!=null){
InputStream ipv4UserInputStream = new ByteArrayInputStream(ipv4UserBytes);
builder.loadDataFilePrivateV4(ipv4UserInputStream);
}
byte[] ipv6UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_user_defined.mmdb");
if (ipv6UserBytes!=null){
InputStream ipv6UserInputStream = new ByteArrayInputStream(ipv6UserBytes);
builder.loadDataFilePrivateV6(ipv6UserInputStream);
}
}else if ("SINGLE".equals(CommonConfig.CLUSTER_OR_SINGLE)){
byte[] ipv4BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_built_in.mmdb");
if (ipv4BuiltBytes!=null){
InputStream ipv4BuiltInputStream = new ByteArrayInputStream(ipv4BuiltBytes);
builder.loadDataFileV4(ipv4BuiltInputStream);
}
byte[] ipv6BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_built_in.mmdb");
if (ipv6BuiltBytes!=null){
InputStream ipv6BuiltInputStream = new ByteArrayInputStream(ipv6BuiltBytes);
builder.loadDataFileV6(ipv6BuiltInputStream);
}
byte[] ipv4UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_user_defined.mmdb");
if (ipv4UserBytes!=null){
InputStream ipv4UserInputStream = new ByteArrayInputStream(ipv4UserBytes);
builder.loadDataFilePrivateV4(ipv4UserInputStream);
}
byte[] ipv6UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_user_defined.mmdb");
if (ipv6UserBytes!=null){
InputStream ipv6UserInputStream = new ByteArrayInputStream(ipv6UserBytes);
builder.loadDataFilePrivateV6(ipv6UserInputStream);
}
}
ipLookup = builder.build();
}catch (Exception e){
LOG.error("加载失败",e);
}
}
public static void updateIpLook(Map<String, byte[]> knowledgeFileCache){
try{
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
ipLookup= builder.loadDataFileV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_built_in.mmdb")))
.loadDataFileV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_built_in.mmdb")))
.loadDataFilePrivateV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_user_defined.mmdb")))
.loadDataFilePrivateV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_user_defined.mmdb")))
.build();
}catch (Exception e){
LOG.error("加载失败",e);
}
}
public static void main(String[] args) {
System.out.println(ipLookup.countryLookup("49.7.115.37"));

View File

@@ -20,11 +20,11 @@ public class NacosUtils {
private static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
private static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace");
private static final String NACOS_STATIC_NAMESPACE = CommonConfigurations.getStringProperty("nacos.static.namespace");
private static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
private static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
private static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
private static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
private static final String NACOS_STATIC_DATA_ID = CommonConfigurations.getStringProperty("nacos.static.data.id");
private static final String NACOS_STATIC_GROUP = CommonConfigurations.getStringProperty("nacos.static.group");
private static final long NACOS_READ_TIMEOUT = CommonConfigurations.getLongProperty("nacos.read.timeout");
static {
@@ -33,7 +33,7 @@ public class NacosUtils {
private static void getProperties() {
nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR);
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE);
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_STATIC_NAMESPACE);
nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME);
nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD);
}
@@ -42,10 +42,11 @@ public class NacosUtils {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(nacosProperties);
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
String config = configService.getConfig(NACOS_STATIC_DATA_ID, NACOS_STATIC_GROUP, NACOS_READ_TIMEOUT);
commonProperties.load(new StringReader(config));
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
configService.addListener(NACOS_STATIC_DATA_ID, NACOS_STATIC_GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;