1.适配TSG 23.07及以上功能,添加数据传输统计指标,并输出至pushgateway。(GAL-409)

2.原URL参数domain从http_domain字段取值,更新为从common_server_domain字段取值。(GAL-410)
This commit is contained in:
wangchengcheng
2023-09-28 15:59:26 +08:00
parent c3ad8140e8
commit 5c0a108393
77 changed files with 4726 additions and 2543 deletions

View File

@@ -0,0 +1,41 @@
package com.zdjizhi.tools.general;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.common
* @Description:
* @date 2023/7/2714:22
*/
public class ConfigurationsUtils {
public static String getStringProperty(Properties properties, String key) {
if (!properties.isEmpty() && properties.containsKey(key)) {
return properties.getProperty(key).trim();
}
return "";
}
public static Integer getIntProperty(Properties properties, String key) {
if (!properties.isEmpty() && properties.containsKey(key)) {
return Integer.parseInt(properties.getProperty(key).trim());
}
return 0;
}
public static Long getLongProperty(Properties properties, String key) {
if (!properties.isEmpty() && properties.containsKey(key)) {
return Long.parseLong(properties.getProperty(key).trim());
}
return 0L;
}
public static Boolean getBooleanProperty(Properties properties, String key) {
if (!properties.isEmpty() && properties.containsKey(key)) {
return Boolean.parseBoolean(properties.getProperty(key).trim());
}
return false;
}
}

View File

@@ -0,0 +1,49 @@
package com.zdjizhi.tools.general;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.ordinary.MD5Utils;
import static com.zdjizhi.common.FlowWriteConfig.judgeFileType;
/**
* 文件字段操作工具
*/
public class FileEdit {
public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){
String fileType = null;
if (judgeFileType(getFileType(urlValue))){
fileType = getFileType(urlValue);
}else {
if (schemaType.equals("HTTP")){
fileType = "html";
}
if (schemaType.equals("MAIL")){
fileType = "eml";
}
}
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account;
}
public static String getFileDownloadUrl(String fileId){
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId;
}
public static String getFileType(String url){
String[] split = url.split("\\.");
return split[split.length-1];
}
public static String getFileId(String url,String fileSuffix) throws Exception {
String[] arr = url.split("/");
String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
String prefix = MD5Utils.md5Encode(filename);
// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
return prefix+fileSuffix;
}
}

View File

@@ -0,0 +1,199 @@
package com.zdjizhi.tools.general;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.*;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.geedgenetworks.utils.IpLookupV2;
import com.geedgenetworks.utils.StringUtil;
import com.google.common.base.Joiner;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
import com.zdjizhi.tools.connections.http.HttpClientService;
import com.zdjizhi.tools.connections.nacos.NacosConnection;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.concurrent.Executor;
/**
* @author qidaijie
* @version 2022/11/16 15:23
*/
public class IpLookupUtils {
private static final Log logger = LogFactory.get();
private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb";
private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb";
private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb";
private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb";
private static final String asnV4Name = "asn_v4.mmdb";
private static final String asnV6Name = "asn_v6.mmdb";
/**
* ip定位库
*/
private static IpLookupV2 ipLookup;
/**
* 定位库默认分隔符
*/
private static final String LOCATION_SEPARATOR = ".";
/**
* 最大重试次数
*/
private static final int TRY_TIMES = 5;
/**
* http connections
*/
private static final HttpClientService httpClientService;
/**
* 定位库元数据缓存
*/
private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16);
static {
JSONPath jsonPath = JSONPath.of(getFilterParameter());
httpClientService = new HttpClientService();
NacosConnection nacosConnection = new NacosConnection();
ConfigService schemaService = nacosConnection.getPublicService();
try {
String configInfo = schemaService.getConfigAndSignListener(FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID, FlowWriteConfig.NACOS_PUBLIC_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
if (StringUtil.isNotBlank(configInfo)) {
updateIpLookup(jsonPath, configInfo);
}
}
});
if (StringUtil.isNotBlank(configInfo)) {
updateIpLookup(jsonPath, configInfo);
}
} catch (NacosException e) {
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
}
}
private static void updateIpLookup(JSONPath jsonPath, String configInfo) {
String extract = jsonPath.extract(JSONReader.of(configInfo)).toString();
if (StringUtil.isNotBlank(extract)) {
JSONArray jsonArray = JSON.parseArray(extract);
if (jsonArray.size() > 0) {
for (int i = 0; i < jsonArray.size(); i++) {
KnowlegeBaseMeta knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(knowlegeBaseMeta.getName(), knowlegeBaseMeta.getFormat());
knowledgeMetaCache.put(fileName, knowlegeBaseMeta);
}
reloadIpLookup();
}
}
}
/**
* 从HDFS下载文件更新IpLookup
*
* @return 更新后的IpLookup
*/
public static void reloadIpLookup() {
int retryNum = 0;
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
for (String fileName : knowledgeMetaCache.keySet()) {
KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
String metaSha256 = knowlegeBaseMeta.getSha256();
do {
byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
if (httpGetByte.length > 0) {
String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
if (metaSha256.equals(downloadFileSha256)) {
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
switch (fileName) {
case ipv4BuiltInName:
builder.loadDataFileV4(inputStream);
break;
case ipv6BuiltInName:
builder.loadDataFileV6(inputStream);
break;
case ipv4UserDefinedName:
builder.loadDataFilePrivateV4(inputStream);
break;
case ipv6UserDefinedName:
builder.loadDataFilePrivateV6(inputStream);
break;
case asnV4Name:
builder.loadAsnDataFileV4(inputStream);
break;
case asnV6Name:
builder.loadAsnDataFileV6(inputStream);
break;
default:
}
retryNum = TRY_TIMES;
} else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
retryNum++;
}
} else {
logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
retryNum++;
}
} while (retryNum < TRY_TIMES);
}
ipLookup = builder.build();
}
/**
* 根据配置组合生成知识库元数据过滤参数
*
* @return 过滤参数
*/
private static String getFilterParameter() {
String[] typeList = CommonConfig.KNOWLEDGEBASE_TYPE_LIST.split(",");
String[] nameList = CommonConfig.KNOWLEDGEBASE_NAME_LIST.split(",");
String expr = "[?(@.version=='latest')]";
if (typeList.length > 1) {
StringBuilder typeBuilder = new StringBuilder();
typeBuilder.append("[?(@.type in (");
for (int i = 0; i < typeList.length; i++) {
if (i == typeList.length - 1) {
typeBuilder.append("'").append(typeList[i]).append("'))]");
} else {
typeBuilder.append("'").append(typeList[i]).append("',");
}
}
expr = expr + typeBuilder;
}
if (nameList.length > 1) {
StringBuilder nameBuilder = new StringBuilder();
nameBuilder.append("[?(@.name in (");
for (int i = 0; i < nameList.length; i++) {
if (i == nameList.length - 1) {
nameBuilder.append("'").append(nameList[i]).append("'))]");
} else {
nameBuilder.append("'").append(nameList[i]).append("',");
}
}
expr = expr + nameBuilder;
}
return expr;
}
public static IpLookupV2 getIpLookup() {
return ipLookup;
}
}

View File

@@ -0,0 +1,213 @@
package com.zdjizhi.tools.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.connections.zookeeper.DistributedLock;
import com.zdjizhi.tools.connections.zookeeper.ZookeeperUtils;
/**
* 雪花算法
*
* @author qidaijie
*/
public class SnowflakeId {
private static final Log logger = LogFactory.get();
/**
* 共64位 第一位为符号位 默认0
* 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63),
* workerId(关联进程):7(0-127) ,序列号11位(2047/ms)
*
* 序列号 /ms = (-1L ^ (-1L << 11))
* 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
*/
/**
* 开始时间截 (2020-11-14 00:00:00) max 17years
*/
private final long twepoch = 1693274481297L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 8L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 5L;
/**
* 支持的最大机器id结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* M << n = M * 2^n
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是31
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 11L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(14+6)
*/
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(4+6+14)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码这里为2047
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~255)
*/
private long workerId;
/**
* 数据中心ID(0~31)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~2047)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
/**
* 设置允许时间回拨的最大限制10s
*/
private static final long rollBackTime = 10000L;
private static SnowflakeId idWorker;
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM);
}
//==============================Constructors=====================================
/**
* 构造函数
*/
private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
try {
lock.lock();
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterIdNum;
} catch (RuntimeException e) {
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
}finally {
lock.unlock();
}
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
private synchronized long nextId() {
long timestamp = timeGen();
//设置一个允许回拨限制时间系统时间回拨范围在rollBackTime内可以等待校准
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
timestamp = tilNextMillis(lastTimestamp);
}
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (dataCenterId << dataCenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 静态工具类
*
* @return
*/
public static Long generateId() {
return idWorker.nextId();
}
}