diff --git a/pom.xml b/pom.xml
index 773429f..f484859 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
log-completion-schema
- 220318-nacos
+ 220318-Nacos
log-completion-schema
http://www.example.com
@@ -270,12 +270,6 @@
2.4.0
-
- io.prometheus
- simpleclient_pushgateway
- 0.9.0
-
-
cn.hutool
hutool-all
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index d548d24..09c78df 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,15 +1,15 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=192.168.44.12:9094
+source.kafka.servers=192.168.44.11:9094
#管理输出kafka地址
-sink.kafka.servers=192.168.44.12:9094
+sink.kafka.servers=192.168.44.11:9094
#zookeeper 地址 用于配置log_id
-zookeeper.servers=192.168.44.12:2181
+zookeeper.servers=192.168.44.11:2181
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=192.168.44.12:2181
+hbase.zookeeper.servers=192.168.44.11:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
@@ -17,24 +17,21 @@ tools.library=D:\\workerspace\\dat\\
#--------------------------------nacos配置------------------------------#
#nacos 地址
-nacos.server=192.168.44.12:8848
+nacos.server=192.168.40.43:8848
#nacos namespace
nacos.schema.namespace=test
-#nacos topology_common_config.properties namespace
-nacos.common.namespace=test
-
#nacos data id
nacos.data.id=session_record.json
#--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic
-source.kafka.topic=SESSION-RECORD
+source.kafka.topic=test
#补全数据 输出 topic
-sink.kafka.topic=SESSION-RECORD-COMPLETED
+sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=flinktest-1
@@ -42,13 +39,13 @@ group.id=flinktest-1
#--------------------------------topology配置------------------------------#
#consumer 并行度
-source.parallelism=9
+source.parallelism=1
#转换函数并行度
-transform.parallelism=27
+transform.parallelism=1
#kafka producer 并行度
-sink.parallelism=9
+sink.parallelism=1
#数据中心,取值范围(0-31)
data.center.id.num=0
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index bab2a29..5fd92ed 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -107,13 +107,6 @@ public class FlowWriteConfig {
/**
* common config
*/
- /**
- * public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers");
- * public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("sink.kafka.servers");
- * public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers");
- * public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library");
- * public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers");
- */
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
diff --git a/src/main/java/com/zdjizhi/common/NacosConfig.java b/src/main/java/com/zdjizhi/common/NacosConfig.java
deleted file mode 100644
index 08bb92a..0000000
--- a/src/main/java/com/zdjizhi/common/NacosConfig.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.zdjizhi.common;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.system.FlowWriteConfigurations;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Properties;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.common
- * @Description:
- * @date 2022/3/189:36
- */
-@Deprecated
-public class NacosConfig {
- private static final Log logger = LogFactory.get();
- private static Properties propCommon = new Properties();
- private static Properties propNacos = new Properties();
-
- private static NacosConfig nacosConfig;
-
- private static void getInstance() {
- nacosConfig = new NacosConfig();
- }
-
- /**
- * 构造函数-新
- */
- private NacosConfig() {
- //获取连接
- getConnection();
- }
-
- /**
- * 初始化Nacos配置列表
- */
- private static void getConnection() {
- try {
- propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
- propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_COMMON_NAMESPACE);
- propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
- propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
- ConfigService configService = NacosFactory.createConfigService(propNacos);
- String commonConfig = configService.getConfig("etl_connection_config.properties", FlowWriteConfig.NACOS_GROUP, 5000);
- if (StringUtil.isNotBlank(commonConfig)) {
- propCommon.load(new StringReader(commonConfig));
- }
- } catch (NacosException | IOException e) {
- logger.error("Get topology run configuration error,The exception message is :" + e.getMessage());
- }
- }
-
- /**
- * 获取String类型配置
- *
- * @param key config key
- * @return value
- */
- public static String getStringProperty(String key) {
-
- if (nacosConfig == null) {
- getInstance();
- }
- return propCommon.getProperty(key);
-
- }
-
- /**
- * 获取Integer类型配置
- *
- * @param key config key
- * @return value
- */
- public static Integer getIntegerProperty(String key) {
- if (nacosConfig == null) {
- getInstance();
- }
-
- return Integer.parseInt(propCommon.getProperty(key));
-
- }
-
-
- /**
- * 获取Long类型配置
- *
- * @param key config key
- * @return value
- */
- public static Long getLongProperty(String key) {
- if (nacosConfig == null) {
- getInstance();
- }
-
- return Long.parseLong(propCommon.getProperty(key));
-
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index 801e07a..c98687b 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -27,8 +27,6 @@ public class LogFlowWriteTopology {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- environment.enableCheckpointing(180 * 1000);
-
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
index d251e22..d5ca29c 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
@@ -122,7 +122,7 @@ public class TransFormTypeMap {
break;
case "app_match":
if (logValue != null && appendToKeyValue == null) {
-// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
}
break;
default:
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index bc9e893..8ed4286 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -64,8 +64,7 @@ class TransFunction {
byte[] dataBytes = String.valueOf(data).getBytes();
long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
String decimalValue = Long.toUnsignedString(hashValue, 10);
- BigInteger result = new BigInteger(decimalValue);
- return result;
+ return new BigInteger(decimalValue);
}
/**
@@ -75,7 +74,15 @@ class TransFunction {
* @return ip地址详细信息
*/
static String getGeoIpDetail(String ip) {
- return ipLookup.cityLookupDetail(ip);
+ try {
+ return ipLookup.cityLookupDetail(ip);
+ } catch (NullPointerException npe) {
+ logger.error("The MMDB file is not loaded or IP is null! " + npe);
+ return "";
+ } catch (RuntimeException e) {
+ logger.error("Get clientIP location error! " + e);
+ return "";
+ }
}
/**
@@ -85,7 +92,15 @@ class TransFunction {
* @return ASN
*/
static String getGeoAsn(String ip) {
- return ipLookup.asnLookup(ip);
+ try {
+ return ipLookup.asnLookup(ip);
+ } catch (NullPointerException npe) {
+ logger.error("The MMDB file is not loaded or IP is null! " + npe);
+ return "";
+ } catch (RuntimeException e) {
+ logger.error("Get IP ASN error! " + e);
+ return "";
+ }
}
/**
@@ -95,8 +110,15 @@ class TransFunction {
* @return 国家
*/
static String getGeoIpCountry(String ip) {
-
- return ipLookup.countryLookup(ip);
+ try {
+ return ipLookup.countryLookup(ip);
+ } catch (NullPointerException npe) {
+ logger.error("The MMDB file is not loaded or IP is null! " + npe);
+ return "";
+ } catch (RuntimeException e) {
+ logger.error("Get ServerIP location error! " + e);
+ return "";
+ }
}
@@ -137,7 +159,7 @@ class TransFunction {
try {
return FormatUtils.getTopPrivateDomain(domain);
} catch (StringIndexOutOfBoundsException outException) {
- logger.error("解析顶级域名异常,异常域名:" + domain);
+ logger.error("Parse top-level domain exceptions, exception domain names:" + domain);
return "";
}
}
@@ -159,8 +181,8 @@ class TransFunction {
result = Base64.decodeStr(message, charset.toString());
}
}
- } catch (RuntimeException rune) {
- logger.error("解析 Base64 异常,异常信息:" + rune);
+ } catch (RuntimeException e) {
+ logger.error("Resolve Base64 exception, exception information:" + e);
}
return result;
}
@@ -182,27 +204,11 @@ class TransFunction {
}
}
} catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
- logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
+ logger.error("The device label resolution exception or [expr] analytic expression error" + e);
}
return flattenResult;
}
-
- /**
- * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
- *
- * @param object 内存实体类
- * @param param 字段名/普通字符串
- * @return JSON.Value or String
- */
- static Object isJsonValue(Object object, String param) {
- if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
- return JsonParseUtil.getValue(object, param.substring(2));
- } else {
- return param;
- }
- }
-
/**
* 判断是否为日志字段,是则返回对应value,否则返回原始字符串
*
@@ -218,34 +224,6 @@ class TransFunction {
}
}
- /**
- * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
- *
- * @param object 内存实体类
- * @param ifParam 字段名/普通字符串
- * @return resultA or resultB or null
- */
- static Object condition(Object object, String ifParam) {
- Object result = null;
- try {
- String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
- if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
- String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
- Object direction = isJsonValue(object, norms[0]);
- Object resultA = isJsonValue(object, split[1]);
- Object resultB = isJsonValue(object, split[2]);
- if (direction instanceof Number) {
- result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
- } else if (direction instanceof String) {
- result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
- }
- }
- } catch (RuntimeException e) {
- logger.error("IF 函数执行异常,异常信息:" + e);
- }
- return result;
- }
-
/**
* IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
*
@@ -273,25 +251,4 @@ class TransFunction {
}
return result;
}
-
-
- /**
- * 设置固定值函数 若为数字则转为long返回
- *
- * @param param 默认值
- * @return 返回数字或字符串
- */
- static Object setValue(String param) {
- try {
- Matcher isNum = PATTERN.matcher(param);
- if (isNum.matches()) {
- return Long.parseLong(param);
- } else {
- return param;
- }
- } catch (RuntimeException e) {
- logger.error("SetValue 函数异常,异常信息:" + e);
- }
- return null;
- }
}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
index 6aa904f..adf2ef4 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -45,6 +45,7 @@ public class HBaseUtils {
getAll();
//定时更新
updateCache();
+
}
private static void getConnection() {
@@ -188,14 +189,21 @@ public class HBaseUtils {
* @return account
*/
public static String getAccount(String clientIp) {
-
- if (hBaseUtils == null) {
- getInstance();
+ if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
+ if (hBaseUtils == null) {
+ getInstance();
+ }
+ return subIdMap.get(clientIp);
}
- return subIdMap.get(clientIp);
-
+ return "";
}
+ /**
+ * 获取当前用户上下线状态信息
+ *
+ * @param result HBase内获取的数据
+ * @return 状态 1-上线 2-下线
+ */
private static int getAcctStatusType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
if (hasType) {
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index f477848..ddb29ed 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -13,7 +13,6 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
-import net.sf.cglib.beans.BeanMap;
import java.util.*;
import java.util.concurrent.Executor;
@@ -56,7 +55,7 @@ public class JsonParseUtil {
String group = FlowWriteConfig.NACOS_GROUP;
String schema = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(schema)) {
- jsonFieldsMap = getMapFromHttp(schema);
+ jsonFieldsMap = getFieldsFromSchema(schema);
jobList = getJobListFromHttp(schema);
}
configService.addListener(dataId, group, new Listener() {
@@ -69,7 +68,7 @@ public class JsonParseUtil {
public void receiveConfigInfo(String configMsg) {
if (StringUtil.isNotBlank(configMsg)) {
clearCache();
- jsonFieldsMap = getMapFromHttp(configMsg);
+ jsonFieldsMap = getFieldsFromSchema(configMsg);
jobList = getJobListFromHttp(configMsg);
}
}
@@ -126,23 +125,6 @@ public class JsonParseUtil {
return clazz;
}
- /**
- * 获取属性值的方法
- *
- * @param obj 对象
- * @param property key
- * @return 属性的值
- */
- public static Object getValue(Object obj, String property) {
- try {
- BeanMap beanMap = BeanMap.create(obj);
- return beanMap.get(property);
- } catch (RuntimeException e) {
- logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
- return null;
- }
- }
-
/**
* 获取属性值的方法
*
@@ -174,22 +156,6 @@ public class JsonParseUtil {
}
}
- /**
- * 更新属性值的方法
- *
- * @param obj 对象
- * @param property 更新的key
- * @param value 更新的值
- */
- public static void setValue(Object obj, String property, Object value) {
- try {
- BeanMap beanMap = BeanMap.create(obj);
- beanMap.put(property, value);
- } catch (ClassCastException e) {
- logger.error("赋予实体类错误类型数据", e);
- }
- }
-
/**
* 类型转换
*
@@ -240,7 +206,7 @@ public class JsonParseUtil {
*
* @return 用于反射生成schema类型的对象的一个map集合
*/
- private static HashMap getMapFromHttp(String schema) {
+ private static HashMap getFieldsFromSchema(String schema) {
HashMap map = new HashMap<>(16);
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index 843028b..28ecff9 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -37,8 +37,8 @@ public class KafkaProducer {
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
FlowWriteConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
- //sink与所有分区建立连接,轮询写入;
createProducerConfig(),
+ //sink与所有分区建立连接,轮询写入;
Optional.empty());
//允许producer记录失败日志而不是捕获和抛出它们
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
index 9d91936..facffc7 100644
--- a/src/main/log4j.properties
+++ b/src/main/log4j.properties
@@ -18,8 +18,8 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
-log4j.logger.com.nis.web.dao=debug
+log4j.logger.com.nis.web.dao=info
#bonecp数据源配置
-log4j.category.com.jolbox=debug,console
+log4j.category.com.jolbox=info,console
diff --git a/src/main/logback.xml b/src/main/logback.xml
index a508b6b..59095f6 100644
--- a/src/main/logback.xml
+++ b/src/main/logback.xml
@@ -32,7 +32,7 @@
-
+
diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java
index 170086c..9bd8e71 100644
--- a/src/test/java/com/zdjizhi/EncryptorTest.java
+++ b/src/test/java/com/zdjizhi/EncryptorTest.java
@@ -25,8 +25,8 @@ public class EncryptorTest {
System.out.println(encPin);
System.out.println(encUser);
// 再进行解密:raw_password
- String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ");
- String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw==");
+ String rawPwd = encryptor.decrypt("ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)");
+ String rawUser = encryptor.decrypt("ENC(nnasyGpHKGFA4KW0zro9MDdw==)");
System.out.println("The username is: "+rawPwd);
System.out.println("The pin is: "+rawUser);
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
index 2dd5837..c667224 100644
--- a/src/test/java/com/zdjizhi/FunctionTest.java
+++ b/src/test/java/com/zdjizhi/FunctionTest.java
@@ -17,10 +17,10 @@ import java.util.Calendar;
public class FunctionTest {
private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
- .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
- .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
- .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
- .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
+ .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
+// .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
+// .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
+// .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
.loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
@@ -37,7 +37,7 @@ public class FunctionTest {
@Test
public void ipLookupTest() {
- String ip = "61.144.36.144";
+ String ip = "0.255.255.254";
System.out.println(ipLookup.cityLookupDetail(ip));
System.out.println(ipLookup.countryLookup(ip));
}
diff --git a/src/test/java/com/zdjizhi/json/JsonTest.java b/src/test/java/com/zdjizhi/json/JsonTest.java
new file mode 100644
index 0000000..597da40
--- /dev/null
+++ b/src/test/java/com/zdjizhi/json/JsonTest.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.json;
+
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.json
+ * @Description:
+ * @date 2022/5/515:08
+ */
+public class JsonTest {
+
+ @Test
+ public void JacksonTest() {
+ String value = "{\"common_log_id\":null}";
+ Map json = (Map) JsonMapper.fromJsonString(value, Map.class);
+ System.out.println(json.get("common_log_id"));
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java
index 52b99e5..7745d5f 100644
--- a/src/test/java/com/zdjizhi/nacos/NacosTest.java
+++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java
@@ -33,15 +33,15 @@ public class NacosTest {
/**
* config data id = config name
*/
- private static final String DATA_ID = "test";
+ private static final String DATA_ID = "dos_detection.properties";
/**
* config group
*/
private static final String GROUP = "Galaxy";
private void getProperties() {
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
- properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.67:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "f507879a-8b1b-4330-913e-83d4fcdc14bb");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
}
@@ -56,6 +56,7 @@ public class NacosTest {
Properties nacosConfigMap = new Properties();
nacosConfigMap.load(new StringReader(content));
System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
+ System.out.println(content);
} catch (NacosException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java
index c81b809..741b2a3 100644
--- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java
+++ b/src/test/java/com/zdjizhi/nacos/SchemaListener.java
@@ -30,8 +30,8 @@ public class SchemaListener {
static {
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
- properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.40.43:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");