提交Nacos动态获取schema功能。(GAL-144)
This commit is contained in:
3
pom.xml
3
pom.xml
@@ -38,6 +38,7 @@
|
||||
<kafka.version>1.0.0</kafka.version>
|
||||
<hbase.version>2.2.3</hbase.version>
|
||||
<nacos.version>1.2.0</nacos.version>
|
||||
<zdjz.tools.version>1.0.8</zdjz.tools.version>
|
||||
<scope.type>provided</scope.type>
|
||||
<!--<scope.type>compile</scope.type>-->
|
||||
</properties>
|
||||
@@ -116,7 +117,7 @@
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>1.0.8</version>
|
||||
<version>${zdjz.tools.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
|
||||
@@ -1,3 +1,20 @@
|
||||
#--------------------------------地址配置------------------------------#
|
||||
#管理kafka地址
|
||||
source.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#管理输出kafka地址
|
||||
sink.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#zookeeper 地址 用于配置log_id
|
||||
zookeeper.servers=192.168.44.12:2181
|
||||
|
||||
#hbase zookeeper地址 用于连接HBase
|
||||
hbase.zookeeper.servers=192.168.44.12:2181
|
||||
|
||||
#--------------------------------HTTP/定位库------------------------------#
|
||||
#定位库地址
|
||||
tools.library=D:\\workerspace\\dat\\
|
||||
|
||||
#--------------------------------nacos配置------------------------------#
|
||||
#nacos 地址
|
||||
nacos.server=192.168.44.12:8848
|
||||
@@ -6,7 +23,7 @@ nacos.server=192.168.44.12:8848
|
||||
nacos.schema.namespace=test
|
||||
|
||||
#nacos topology_common_config.properties namespace
|
||||
nacos.common.namespace=flink
|
||||
nacos.common.namespace=test
|
||||
|
||||
#nacos data id
|
||||
nacos.data.id=session_record.json
|
||||
|
||||
@@ -66,14 +66,12 @@ public class FlowWriteConfig {
|
||||
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
|
||||
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
|
||||
|
||||
|
||||
/**
|
||||
* kafka common
|
||||
*/
|
||||
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
|
||||
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
|
||||
|
||||
|
||||
/**
|
||||
* kafka source config
|
||||
*/
|
||||
@@ -83,7 +81,6 @@ public class FlowWriteConfig {
|
||||
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
|
||||
public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
|
||||
|
||||
|
||||
/**
|
||||
* kafka sink config
|
||||
*/
|
||||
@@ -110,11 +107,17 @@ public class FlowWriteConfig {
|
||||
/**
|
||||
* common config
|
||||
*/
|
||||
public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers");
|
||||
public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("etl.sink.kafka.servers");
|
||||
public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers");
|
||||
public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library");
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers");
|
||||
|
||||
/**
|
||||
* public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers");
|
||||
* public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("sink.kafka.servers");
|
||||
* public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers");
|
||||
* public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library");
|
||||
* public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers");
|
||||
*/
|
||||
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
|
||||
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
|
||||
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
|
||||
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
|
||||
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import java.util.Properties;
|
||||
* @Description:
|
||||
* @date 2022/3/189:36
|
||||
*/
|
||||
@Deprecated
|
||||
public class NacosConfig {
|
||||
private static final Log logger = LogFactory.get();
|
||||
private static Properties propCommon = new Properties();
|
||||
@@ -48,7 +49,7 @@ public class NacosConfig {
|
||||
propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
|
||||
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
|
||||
ConfigService configService = NacosFactory.createConfigService(propNacos);
|
||||
String commonConfig = configService.getConfig("topology_common_config.properties", FlowWriteConfig.NACOS_GROUP, 5000);
|
||||
String commonConfig = configService.getConfig("etl_connection_config.properties", FlowWriteConfig.NACOS_GROUP, 5000);
|
||||
if (StringUtil.isNotBlank(commonConfig)) {
|
||||
propCommon.load(new StringReader(commonConfig));
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ public class LogFlowWriteTopology {
|
||||
public static void main(String[] args) {
|
||||
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
environment.enableCheckpointing(180 * 1000);
|
||||
|
||||
//两个输出之间的最大时间 (单位milliseconds)
|
||||
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
|
||||
|
||||
|
||||
@@ -12,16 +12,12 @@ import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.common.NacosConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.system.FlowWriteConfigurations;
|
||||
import net.sf.cglib.beans.BeanGenerator;
|
||||
import net.sf.cglib.beans.BeanMap;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import static com.zdjizhi.utils.json.JsonTypeUtils.*;
|
||||
|
||||
/**
|
||||
* 使用FastJson解析json的工具类
|
||||
@@ -36,10 +32,12 @@ public class JsonParseUtil {
|
||||
* 获取需要删除字段的列表
|
||||
*/
|
||||
private static ArrayList<String> dropList = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 在内存中加载反射类用的map
|
||||
*/
|
||||
private static HashMap<String, Class> map;
|
||||
private static HashMap<String, Class> jsonFieldsMap;
|
||||
|
||||
/**
|
||||
* 获取任务列表
|
||||
* list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
|
||||
@@ -58,8 +56,8 @@ public class JsonParseUtil {
|
||||
String group = FlowWriteConfig.NACOS_GROUP;
|
||||
String schema = configService.getConfig(dataId, group, 5000);
|
||||
if (StringUtil.isNotBlank(schema)) {
|
||||
jsonFieldsMap = getMapFromHttp(schema);
|
||||
jobList = getJobListFromHttp(schema);
|
||||
map = getMapFromHttp(schema);
|
||||
}
|
||||
configService.addListener(dataId, group, new Listener() {
|
||||
@Override
|
||||
@@ -70,7 +68,8 @@ public class JsonParseUtil {
|
||||
@Override
|
||||
public void receiveConfigInfo(String configMsg) {
|
||||
if (StringUtil.isNotBlank(configMsg)) {
|
||||
map = getMapFromHttp(configMsg);
|
||||
clearCache();
|
||||
jsonFieldsMap = getMapFromHttp(configMsg);
|
||||
jobList = getJobListFromHttp(configMsg);
|
||||
}
|
||||
}
|
||||
@@ -200,29 +199,29 @@ public class JsonParseUtil {
|
||||
JsonParseUtil.dropJsonField(jsonMap);
|
||||
HashMap<String, Object> tmpMap = new HashMap<>(192);
|
||||
for (String key : jsonMap.keySet()) {
|
||||
if (map.containsKey(key)) {
|
||||
String simpleName = map.get(key).getSimpleName();
|
||||
if (jsonFieldsMap.containsKey(key)) {
|
||||
String simpleName = jsonFieldsMap.get(key).getSimpleName();
|
||||
switch (simpleName) {
|
||||
case "String":
|
||||
tmpMap.put(key, checkString(jsonMap.get(key)));
|
||||
tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
|
||||
break;
|
||||
case "Integer":
|
||||
tmpMap.put(key, getIntValue(jsonMap.get(key)));
|
||||
tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key)));
|
||||
break;
|
||||
case "long":
|
||||
tmpMap.put(key, checkLongValue(jsonMap.get(key)));
|
||||
tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key)));
|
||||
break;
|
||||
case "List":
|
||||
tmpMap.put(key, checkArray(jsonMap.get(key)));
|
||||
tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key)));
|
||||
break;
|
||||
case "Map":
|
||||
tmpMap.put(key, checkObject(jsonMap.get(key)));
|
||||
tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key)));
|
||||
break;
|
||||
case "double":
|
||||
tmpMap.put(key, checkDouble(jsonMap.get(key)));
|
||||
tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key)));
|
||||
break;
|
||||
default:
|
||||
tmpMap.put(key, checkString(jsonMap.get(key)));
|
||||
tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,7 +240,7 @@ public class JsonParseUtil {
|
||||
*
|
||||
* @return 用于反射生成schema类型的对象的一个map集合
|
||||
*/
|
||||
public static HashMap<String, Class> getMapFromHttp(String schema) {
|
||||
private static HashMap<String, Class> getMapFromHttp(String schema) {
|
||||
HashMap<String, Class> map = new HashMap<>(16);
|
||||
|
||||
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
|
||||
@@ -298,12 +297,12 @@ public class JsonParseUtil {
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
|
||||
* 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
|
||||
*
|
||||
* @param schema 网关url
|
||||
* @param schema 日志schema
|
||||
* @return 任务列表
|
||||
*/
|
||||
public static ArrayList<String[]> getJobListFromHttp(String schema) {
|
||||
private static ArrayList<String[]> getJobListFromHttp(String schema) {
|
||||
ArrayList<String[]> list = new ArrayList<>();
|
||||
|
||||
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
|
||||
@@ -361,4 +360,13 @@ public class JsonParseUtil {
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 在配置变动时,清空缓存重新获取
|
||||
*/
|
||||
private static void clearCache() {
|
||||
jobList.clear();
|
||||
jsonFieldsMap.clear();
|
||||
dropList.clear();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,24 +1,11 @@
|
||||
package com.zdjizhi.utils.json;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.nacos.api.NacosFactory;
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.exception.FlowWriteException;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import static com.zdjizhi.utils.json.JsonParseUtil.getJobListFromHttp;
|
||||
import static com.zdjizhi.utils.json.JsonParseUtil.getMapFromHttp;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
@@ -26,7 +13,7 @@ import static com.zdjizhi.utils.json.JsonParseUtil.getMapFromHttp;
|
||||
* @Description:
|
||||
* @date 2021/7/1217:34
|
||||
*/
|
||||
public class JsonTypeUtils {
|
||||
public class JsonTypeUtil {
|
||||
/**
|
||||
* String 类型检验转换方法
|
||||
*
|
||||
@@ -40,6 +40,7 @@ public class KafkaConsumer {
|
||||
*
|
||||
* @return kafka logs -> map
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
|
||||
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
|
||||
new TimestampDeserializationSchema(), createConsumerConfig());
|
||||
@@ -62,7 +63,10 @@ public class KafkaConsumer {
|
||||
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
|
||||
new SimpleStringSchema(), createConsumerConfig());
|
||||
|
||||
//随着checkpoint提交,将offset提交到kafka
|
||||
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
|
||||
|
||||
//从消费组当前的offset开始消费
|
||||
kafkaConsumer.setStartFromGroupOffsets();
|
||||
|
||||
return kafkaConsumer;
|
||||
|
||||
79
src/test/java/com/zdjizhi/json/JsonPathTest.java
Normal file
79
src/test/java/com/zdjizhi/json/JsonPathTest.java
Normal file
@@ -0,0 +1,79 @@
|
||||
package com.zdjizhi.json;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.nacos.api.NacosFactory;
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.json
|
||||
* @Description:
|
||||
* @date 2022/3/2410:22
|
||||
*/
|
||||
public class JsonPathTest {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
private static Properties propNacos = new Properties();
|
||||
|
||||
/**
|
||||
* 获取需要删除字段的列表
|
||||
*/
|
||||
private static ArrayList<String> dropList = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 在内存中加载反射类用的map
|
||||
*/
|
||||
private static HashMap<String, Class> map;
|
||||
|
||||
/**
|
||||
* 获取任务列表
|
||||
* list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
|
||||
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
|
||||
*/
|
||||
private static ArrayList<String[]> jobList;
|
||||
|
||||
private static String schema;
|
||||
|
||||
static {
|
||||
propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
|
||||
propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE);
|
||||
propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
|
||||
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
|
||||
try {
|
||||
ConfigService configService = NacosFactory.createConfigService(propNacos);
|
||||
String dataId = FlowWriteConfig.NACOS_DATA_ID;
|
||||
String group = FlowWriteConfig.NACOS_GROUP;
|
||||
String config = configService.getConfig(dataId, group, 5000);
|
||||
if (StringUtil.isNotBlank(config)) {
|
||||
schema = config;
|
||||
}
|
||||
} catch (NacosException e) {
|
||||
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseSchemaGetFields() {
|
||||
DocumentContext parse = JsonPath.parse(schema);
|
||||
List<Object> fields = parse.read("$.fields[*]");
|
||||
for (Object field : fields) {
|
||||
String name = JsonPath.read(field, "$.name").toString();
|
||||
String type = JsonPath.read(field, "$.type").toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,8 +5,6 @@ import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.system.FlowWriteConfigurations;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -22,10 +20,26 @@ import java.util.concurrent.Executor;
|
||||
* @date 2022/3/1016:58
|
||||
*/
|
||||
public class NacosTest {
|
||||
private static Properties properties = new Properties();
|
||||
|
||||
@Test
|
||||
public void getProperties() {
|
||||
/**
|
||||
* <dependency>
|
||||
* <groupId>com.alibaba.nacos</groupId>
|
||||
* <artifactId>nacos-client</artifactId>
|
||||
* <version>1.2.0</version>
|
||||
* </dependency>
|
||||
*/
|
||||
|
||||
private static Properties properties = new Properties();
|
||||
/**
|
||||
* config data id = config name
|
||||
*/
|
||||
private static final String DATA_ID = "test";
|
||||
/**
|
||||
* config group
|
||||
*/
|
||||
private static final String GROUP = "Galaxy";
|
||||
|
||||
private void getProperties() {
|
||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
|
||||
properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
|
||||
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
|
||||
@@ -38,11 +52,10 @@ public class NacosTest {
|
||||
try {
|
||||
getProperties();
|
||||
ConfigService configService = NacosFactory.createConfigService(properties);
|
||||
String content = configService.getConfig("topology_common_config.properties", "Galaxy", 5000);
|
||||
String content = configService.getConfig(DATA_ID, GROUP, 5000);
|
||||
Properties nacosConfigMap = new Properties();
|
||||
nacosConfigMap.load(new StringReader(content));
|
||||
System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
|
||||
System.out.println(nacosConfigMap.getProperty("schema.http"));
|
||||
} catch (NacosException | IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
@@ -53,16 +66,14 @@ public class NacosTest {
|
||||
@Test
|
||||
public void ListenerConfigurationTest() {
|
||||
getProperties();
|
||||
ConfigService configService = null;
|
||||
try {
|
||||
configService = NacosFactory.createConfigService(properties);
|
||||
String content = configService.getConfig("ETL-SESSION-RECORD-COMPLETED", "etl", 5000);
|
||||
//first get config
|
||||
ConfigService configService = NacosFactory.createConfigService(properties);
|
||||
String config = configService.getConfig(DATA_ID, GROUP, 5000);
|
||||
System.out.println(config);
|
||||
|
||||
Properties nacosConfigMap = new Properties();
|
||||
nacosConfigMap.load(new StringReader(content));
|
||||
System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
|
||||
|
||||
configService.addListener("ETL-SESSION-RECORD-COMPLETED", "etl", new Listener() {
|
||||
//start listenner
|
||||
configService.addListener(DATA_ID, GROUP, new Listener() {
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return null;
|
||||
@@ -70,17 +81,20 @@ public class NacosTest {
|
||||
|
||||
@Override
|
||||
public void receiveConfigInfo(String configMsg) {
|
||||
try {
|
||||
Properties nacosConfigMap = new Properties();
|
||||
nacosConfigMap.load(new StringReader(configMsg));
|
||||
System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
System.out.println(configMsg);
|
||||
}
|
||||
});
|
||||
} catch (NacosException | IOException e) {
|
||||
} catch (NacosException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
//keep running,change nacos config,print new config
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,16 +9,11 @@ import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import com.zdjizhi.utils.json.JsonTypeUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
@@ -62,11 +57,17 @@ public class SchemaListener {
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void dealCommonMessage() {
|
||||
|
||||
System.out.println(Arrays.toString(jobList.get(0)));
|
||||
|
||||
@Test
|
||||
public void dealCommonMessage() {
|
||||
//keep running,change nacos config,print new config
|
||||
while (true) {
|
||||
try {
|
||||
System.out.println(Arrays.toString(jobList.get(0)));
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -74,12 +75,9 @@ public class SchemaListener {
|
||||
*
|
||||
* @return 任务列表
|
||||
*/
|
||||
public static ArrayList<String[]> getJobListFromHttp(String schema) {
|
||||
private static ArrayList<String[]> getJobListFromHttp(String schema) {
|
||||
ArrayList<String[]> list = new ArrayList<>();
|
||||
|
||||
//解析data
|
||||
// Object data = JSON.parseObject(schema).get("data");
|
||||
|
||||
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
|
||||
JSONObject schemaJson = JSON.parseObject(schema);
|
||||
JSONArray fields = (JSONArray) schemaJson.get("fields");
|
||||
|
||||
Reference in New Issue
Block a user