1:删除NacosConfig类。

2:修改部分异常描述信息。
3:删除checkpoint配置。
This commit is contained in:
qidaijie
2022-06-22 11:21:22 +08:00
parent fdaa582229
commit 2e8c8a3dbd
17 changed files with 100 additions and 270 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
<version>220318-nacos</version>
<version>220318-Nacos</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
@@ -270,12 +270,6 @@
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>

View File

@@ -1,15 +1,15 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
source.kafka.servers=192.168.44.11:9094
#管理输出kafka地址
sink.kafka.servers=192.168.44.12:9094
sink.kafka.servers=192.168.44.11:9094
#zookeeper 地址 用于配置log_id
zookeeper.servers=192.168.44.12:2181
zookeeper.servers=192.168.44.11:2181
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.12:2181
hbase.zookeeper.servers=192.168.44.11:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
@@ -17,24 +17,21 @@ tools.library=D:\\workerspace\\dat\\
#--------------------------------nacos配置------------------------------#
#nacos 地址
nacos.server=192.168.44.12:8848
nacos.server=192.168.40.43:8848
#nacos namespace
nacos.schema.namespace=test
#nacos topology_common_config.properties namespace
nacos.common.namespace=test
#nacos data id
nacos.data.id=session_record.json
#--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic
source.kafka.topic=SESSION-RECORD
source.kafka.topic=test
#补全数据 输出 topic
sink.kafka.topic=SESSION-RECORD-COMPLETED
sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=flinktest-1
@@ -42,13 +39,13 @@ group.id=flinktest-1
#--------------------------------topology配置------------------------------#
#consumer 并行度
source.parallelism=9
source.parallelism=1
#转换函数并行度
transform.parallelism=27
transform.parallelism=1
#kafka producer 并行度
sink.parallelism=9
sink.parallelism=1
#数据中心,取值范围(0-31)
data.center.id.num=0

View File

@@ -107,13 +107,6 @@ public class FlowWriteConfig {
/**
* common config
*/
/**
* public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers");
* public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("sink.kafka.servers");
* public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers");
* public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library");
* public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers");
*/
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");

View File

@@ -1,107 +0,0 @@
package com.zdjizhi.common;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.system.FlowWriteConfigurations;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.common
* @Description:
* @date 2022/3/189:36
*/
@Deprecated
public class NacosConfig {
private static final Log logger = LogFactory.get();
private static Properties propCommon = new Properties();
private static Properties propNacos = new Properties();
private static NacosConfig nacosConfig;
private static void getInstance() {
nacosConfig = new NacosConfig();
}
/**
* 构造函数-新
*/
private NacosConfig() {
//获取连接
getConnection();
}
/**
* 初始化Nacos配置列表
*/
private static void getConnection() {
try {
propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_COMMON_NAMESPACE);
propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
ConfigService configService = NacosFactory.createConfigService(propNacos);
String commonConfig = configService.getConfig("etl_connection_config.properties", FlowWriteConfig.NACOS_GROUP, 5000);
if (StringUtil.isNotBlank(commonConfig)) {
propCommon.load(new StringReader(commonConfig));
}
} catch (NacosException | IOException e) {
logger.error("Get topology run configuration error,The exception message is " + e.getMessage());
}
}
/**
* 获取String类型配置
*
* @param key config key
* @return value
*/
public static String getStringProperty(String key) {
if (nacosConfig == null) {
getInstance();
}
return propCommon.getProperty(key);
}
/**
* 获取Integer类型配置
*
* @param key config key
* @return value
*/
public static Integer getIntegerProperty(String key) {
if (nacosConfig == null) {
getInstance();
}
return Integer.parseInt(propCommon.getProperty(key));
}
/**
* 获取Long类型配置
*
* @param key config key
* @return value
*/
public static Long getLongProperty(String key) {
if (nacosConfig == null) {
getInstance();
}
return Long.parseLong(propCommon.getProperty(key));
}
}

View File

@@ -27,8 +27,6 @@ public class LogFlowWriteTopology {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(180 * 1000);
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);

View File

@@ -122,7 +122,7 @@ public class TransFormTypeMap {
break;
case "app_match":
if (logValue != null && appendToKeyValue == null) {
// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
}
break;
default:

View File

@@ -64,8 +64,7 @@ class TransFunction {
byte[] dataBytes = String.valueOf(data).getBytes();
long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
String decimalValue = Long.toUnsignedString(hashValue, 10);
BigInteger result = new BigInteger(decimalValue);
return result;
return new BigInteger(decimalValue);
}
/**
@@ -75,7 +74,15 @@ class TransFunction {
* @return ip地址详细信息
*/
static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
try {
return ipLookup.cityLookupDetail(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) {
logger.error("Get clientIP location error! " + e);
return "";
}
}
/**
@@ -85,7 +92,15 @@ class TransFunction {
* @return ASN
*/
static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip);
try {
return ipLookup.asnLookup(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) {
logger.error("Get IP ASN error! " + e);
return "";
}
}
/**
@@ -95,8 +110,15 @@ class TransFunction {
* @return 国家
*/
static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
try {
return ipLookup.countryLookup(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) {
logger.error("Get ServerIP location error! " + e);
return "";
}
}
@@ -137,7 +159,7 @@ class TransFunction {
try {
return FormatUtils.getTopPrivateDomain(domain);
} catch (StringIndexOutOfBoundsException outException) {
logger.error("解析顶级域名异常,异常域名:" + domain);
logger.error("Parse top-level domain exceptions, exception domain names:" + domain);
return "";
}
}
@@ -159,8 +181,8 @@ class TransFunction {
result = Base64.decodeStr(message, charset.toString());
}
}
} catch (RuntimeException rune) {
logger.error("解析 Base64 异常,异常信息:" + rune);
} catch (RuntimeException e) {
logger.error("Resolve Base64 exception, exception information:" + e);
}
return result;
}
@@ -182,27 +204,11 @@ class TransFunction {
}
}
} catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
logger.error("The device label resolution exception or [expr] analytic expression error" + e);
}
return flattenResult;
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param object 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
static Object isJsonValue(Object object, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
return JsonParseUtil.getValue(object, param.substring(2));
} else {
return param;
}
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
@@ -218,34 +224,6 @@ class TransFunction {
}
}
/**
* IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
*
* @param object 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or null
*/
static Object condition(Object object, String ifParam) {
Object result = null;
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
Object direction = isJsonValue(object, norms[0]);
Object resultA = isJsonValue(object, split[1]);
Object resultB = isJsonValue(object, split[2]);
if (direction instanceof Number) {
result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
} else if (direction instanceof String) {
result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
}
}
} catch (RuntimeException e) {
logger.error("IF 函数执行异常,异常信息:" + e);
}
return result;
}
/**
* IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
*
@@ -273,25 +251,4 @@ class TransFunction {
}
return result;
}
/**
* 设置固定值函数 若为数字则转为long返回
*
* @param param 默认值
* @return 返回数字或字符串
*/
static Object setValue(String param) {
try {
Matcher isNum = PATTERN.matcher(param);
if (isNum.matches()) {
return Long.parseLong(param);
} else {
return param;
}
} catch (RuntimeException e) {
logger.error("SetValue 函数异常,异常信息:" + e);
}
return null;
}
}

View File

@@ -45,6 +45,7 @@ public class HBaseUtils {
getAll();
//定时更新
updateCache();
}
private static void getConnection() {
@@ -188,14 +189,21 @@ public class HBaseUtils {
* @return account
*/
public static String getAccount(String clientIp) {
if (hBaseUtils == null) {
getInstance();
if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
if (hBaseUtils == null) {
getInstance();
}
return subIdMap.get(clientIp);
}
return subIdMap.get(clientIp);
return "";
}
/**
* 获取当前用户上下线状态信息
*
* @param result HBase内获取的数据
* @return 状态 1-上线 2-下线
*/
private static int getAcctStatusType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
if (hasType) {

View File

@@ -13,7 +13,6 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import net.sf.cglib.beans.BeanMap;
import java.util.*;
import java.util.concurrent.Executor;
@@ -56,7 +55,7 @@ public class JsonParseUtil {
String group = FlowWriteConfig.NACOS_GROUP;
String schema = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(schema)) {
jsonFieldsMap = getMapFromHttp(schema);
jsonFieldsMap = getFieldsFromSchema(schema);
jobList = getJobListFromHttp(schema);
}
configService.addListener(dataId, group, new Listener() {
@@ -69,7 +68,7 @@ public class JsonParseUtil {
public void receiveConfigInfo(String configMsg) {
if (StringUtil.isNotBlank(configMsg)) {
clearCache();
jsonFieldsMap = getMapFromHttp(configMsg);
jsonFieldsMap = getFieldsFromSchema(configMsg);
jobList = getJobListFromHttp(configMsg);
}
}
@@ -126,23 +125,6 @@ public class JsonParseUtil {
return clazz;
}
/**
* 获取属性值的方法
*
* @param obj 对象
* @param property key
* @return 属性的值
*/
public static Object getValue(Object obj, String property) {
try {
BeanMap beanMap = BeanMap.create(obj);
return beanMap.get(property);
} catch (RuntimeException e) {
logger.error("获取json-value异常异常key" + property + "异常信息为:" + e);
return null;
}
}
/**
* 获取属性值的方法
*
@@ -174,22 +156,6 @@ public class JsonParseUtil {
}
}
/**
* 更新属性值的方法
*
* @param obj 对象
* @param property 更新的key
* @param value 更新的值
*/
public static void setValue(Object obj, String property, Object value) {
try {
BeanMap beanMap = BeanMap.create(obj);
beanMap.put(property, value);
} catch (ClassCastException e) {
logger.error("赋予实体类错误类型数据", e);
}
}
/**
* 类型转换
*
@@ -240,7 +206,7 @@ public class JsonParseUtil {
*
* @return 用于反射生成schema类型的对象的一个map集合
*/
private static HashMap<String, Class> getMapFromHttp(String schema) {
private static HashMap<String, Class> getFieldsFromSchema(String schema) {
HashMap<String, Class> map = new HashMap<>(16);
//获取fields并转化为数组数组的每个元素都是一个name doc type

View File

@@ -37,8 +37,8 @@ public class KafkaProducer {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
FlowWriteConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
//sink与所有分区建立连接轮询写入
createProducerConfig(),
//sink与所有分区建立连接轮询写入
Optional.empty());
//允许producer记录失败日志而不是捕获和抛出它们

View File

@@ -18,8 +18,8 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=debug
log4j.logger.com.nis.web.dao=info
#bonecp数据源配置
log4j.category.com.jolbox=debug,console
log4j.category.com.jolbox=info,console

View File

@@ -32,7 +32,7 @@
</encoder>
</appender>
<!-- project default level项目输出的日志级别 -->
<logger name="com.example.demo" level="DEBUG" />
<logger name="com.example.demo" level="INFO" />
<!-- 日志输出级别 常用的日志级别按照从高到低依次为ERROR、WARN、INFO、DEBUG。 -->
<root level="INFO">

View File

@@ -25,8 +25,8 @@ public class EncryptorTest {
System.out.println(encPin);
System.out.println(encUser);
// 再进行解密raw_password
String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ");
String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw==");
String rawPwd = encryptor.decrypt("ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)");
String rawUser = encryptor.decrypt("ENC(nnasyGpHKGFA4KW0zro9MDdw==)");
System.out.println("The username is: "+rawPwd);
System.out.println("The pin is: "+rawUser);

View File

@@ -17,10 +17,10 @@ import java.util.Calendar;
public class FunctionTest {
private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
.loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
.loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
// .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
// .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
// .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
.loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
@@ -37,7 +37,7 @@ public class FunctionTest {
@Test
public void ipLookupTest() {
String ip = "61.144.36.144";
String ip = "0.255.255.254";
System.out.println(ipLookup.cityLookupDetail(ip));
System.out.println(ipLookup.countryLookup(ip));
}

View File

@@ -0,0 +1,23 @@
package com.zdjizhi.json;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.junit.Test;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.json
* @Description:
* @date 2022/5/515:08
*/
public class JsonTest {
@Test
public void JacksonTest() {
String value = "{\"common_log_id\":null}";
Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
System.out.println(json.get("common_log_id"));
}
}

View File

@@ -33,15 +33,15 @@ public class NacosTest {
/**
* config data id = config name
*/
private static final String DATA_ID = "test";
private static final String DATA_ID = "dos_detection.properties";
/**
* config group
*/
private static final String GROUP = "Galaxy";
private void getProperties() {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.67:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "f507879a-8b1b-4330-913e-83d4fcdc14bb");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
}
@@ -56,6 +56,7 @@ public class NacosTest {
Properties nacosConfigMap = new Properties();
nacosConfigMap.load(new StringReader(content));
System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
System.out.println(content);
} catch (NacosException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();

View File

@@ -30,8 +30,8 @@ public class SchemaListener {
static {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.40.43:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");