diff --git a/pom.xml b/pom.xml index 08db21b..3222397 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-stream-doublewrite - 22.04 + 22.04-v3 log-stream-doublewrite http://www.example.com @@ -39,7 +39,7 @@ 2.2.3 1.2.0 1.0.8 - + provided diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 241d28c..06a33b3 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,35 +1,35 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=10.3.60.3:9094 +source.kafka.servers=192.168.44.12:9094 #百分点输出kafka地址 -percent.sink.kafka.servers=10.3.45.126:6667,10.3.45.127:6667,10.3.45.128:6667 +percent.sink.kafka.servers=192.168.44.12:9094 #文件源数据topic输出kafka地址 -file.data.sink.kafka.servers=10.3.60.3:9094 +file.data.sink.kafka.servers=192.168.44.12:9094 #zookeeper 地址 用于配置log_id -zookeeper.servers=10.3.60.3:2181 +zookeeper.servers=192.168.44.12:2181 #hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=10.3.60.3:2181 +hbase.zookeeper.servers=192.168.44.12:2181 #--------------------------------HTTP/定位库------------------------------# #定位库地址 -tools.library=/opt/dat/ +tools.library=C:\\workspace\\dat\\ #--------------------------------nacos配置------------------------------# #nacos 地址 -nacos.server=10.3.60.3:8848 +nacos.server=192.168.44.12:8848 #nacos namespace -nacos.schema.namespace=prod +nacos.schema.namespace=P19 #nacos topology_common_config.properties namespace -nacos.common.namespace=prod +nacos.common.namespace=P19 #nacos data id -nacos.data.id=session_record.json +nacos.data.id=security_event.json #------------------------------------OOS配置------------------------------------# #oos地址 @@ -38,10 +38,10 @@ oos.servers=10.3.45.100:8057 #--------------------------------Kafka消费/生产配置------------------------------# #kafka 接收数据topic -source.kafka.topic=SESSION-RECORD +source.kafka.topic=test #百分点对应的topic -percent.kafka.topic=SESSION-RECORD +percent.kafka.topic=PERCENT-RECORD #文件源数据topic file.data.kafka.topic=test-file-data diff --git a/src/main/java/com/zdjizhi/bean/FileMeta.java b/src/main/java/com/zdjizhi/bean/FileMeta.java index e24e0b4..96a18ea 100644 --- a/src/main/java/com/zdjizhi/bean/FileMeta.java +++ b/src/main/java/com/zdjizhi/bean/FileMeta.java @@ -6,7 +6,7 @@ public class FileMeta { private long common_log_id; protected int common_recv_time; private String common_schema_type; - private JSONArray sourceList; + private JSONArray source_list; private int processing_time; public long getCommon_log_id() { @@ -33,12 +33,12 @@ public class FileMeta { this.common_schema_type = common_schema_type; } - public JSONArray getSourceList() { - return sourceList; + public JSONArray getSource_list() { + return source_list; } - public void setSourceList(JSONArray sourceList) { - this.sourceList = sourceList; + public void setSource_list(JSONArray source_list) { + this.source_list = source_list; } public int getProcessing_time() { diff --git a/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java index a90f2f2..bec1fe5 100644 --- a/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java @@ -10,7 +10,6 @@ import com.zdjizhi.bean.SourceList; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.FileEdit; -import com.zdjizhi.utils.json.JsonTypeUtil; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; @@ -53,56 +52,50 @@ public class DealFileProcessFunction extends ProcessFunction public void processElement(Map message, Context context, Collector collector) throws Exception { try { if (message.size() > 0) { -// jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); -// jsonMap = JsonTypeUtil.typeTransform(map); + rpUrlValue = (String) message.get("http_response_body"); rqUrlValue = (String) message.get("http_request_body"); emailUrlValue = (String) message.get("mail_eml_file"); - if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) { - cfgId = (long) message.get("common_policy_id"); + cfgId = (long) message.getOrDefault("common_policy_id",0L); sIp = (String) message.get("common_client_ip"); sPort = (int) message.get("common_client_port"); dIp = (String) message.get("common_server_ip"); dPort = (int) message.get("common_server_port"); foundTime = (long) message.get("common_recv_time"); schemaType = (String) message.get("common_schema_type"); + domain = (String)message.getOrDefault("http_domain",""); + account = (String)message.getOrDefault("common_subscribe_id",""); - if (StringUtil.isNotBlank((String) message.get("http_domain"))) { - domain = message.get("http_domain").toString(); - } else { - domain = "NA"; - } - if (StringUtil.isNotBlank((String) message.get("common_subscribe_id"))) { - account = message.get("common_subscribe_id").toString(); - } else { - account = "NA"; - } FileMeta fileMeta = new FileMeta(); JSONArray jsonarray = new JSONArray(); if (StringUtil.isNotBlank(rqUrlValue)) { - message.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1")); + System.out.println(rqUrlValue); + String fileId = FileEdit.getFileId(rqUrlValue,"_1"); + message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId)); SourceList request = new SourceList(); request.setSource_oss_path(rqUrlValue); - request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1")); + request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, fileId)); jsonarray.add(request); } if (StringUtil.isNotBlank(rpUrlValue)) { - message.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2")); + String fileId = FileEdit.getFileId(rpUrlValue,"_2"); + message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId)); SourceList response = new SourceList(); response.setSource_oss_path(rpUrlValue); - response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2")); + response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, fileId)); jsonarray.add(response); } if (StringUtil.isNotBlank(emailUrlValue)) { - message.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9")); + String fileId = FileEdit.getFileId(emailUrlValue,"_9"); + message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId)); SourceList emailFile = new SourceList(); emailFile.setSource_oss_path(emailUrlValue); - emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9")); + emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, fileId)); jsonarray.add(emailFile); } - fileMeta.setSourceList(jsonarray); + fileMeta.setSource_list(jsonarray); fileMeta.setCommon_log_id((long) message.get("common_log_id")); fileMeta.setCommon_recv_time(Integer.parseInt(message.get("common_recv_time").toString())); fileMeta.setCommon_schema_type((String) message.get("common_schema_type")); diff --git a/src/main/java/com/zdjizhi/utils/general/FileEdit.java b/src/main/java/com/zdjizhi/utils/general/FileEdit.java index 8c3da79..144dedd 100644 --- a/src/main/java/com/zdjizhi/utils/general/FileEdit.java +++ b/src/main/java/com/zdjizhi/utils/general/FileEdit.java @@ -12,7 +12,7 @@ import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType; public class FileEdit { - public static String fileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileSuffix) throws Exception { + public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){ String fileType = null; if (judgeFileType(getFileType(urlValue))){ fileType = getFileType(urlValue); @@ -24,11 +24,12 @@ public class FileEdit { fileType = "eml"; } } - return "http://"+ FlowWriteConfig.OOS_SERVERS+"/upload_v2"+"/"+cfgId+"/"+fileType+"/"+sIp+"/"+sPort+"/"+dIp+"/"+dPort+"/"+foundTime+"/"+account+"/"+domain+"/"+getFileName(urlValue,fileSuffix); + + return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account; } - public static String fileDownloadUrl( String urlValue,String fileSuffix) throws Exception { - return "http://"+ FlowWriteConfig.OOS_SERVERS+"/download_v2"+"/"+getFileName(urlValue,fileSuffix); + public static String getFileDownloadUrl(String fileId){ + return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId; } @@ -37,7 +38,8 @@ public class FileEdit { return split[split.length-1]; } - public static String getFileName(String url,String fileSuffix) throws Exception { + public static String getFileId(String url,String fileSuffix) throws Exception { + String[] arr = url.split("/"); String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_")); String prefix = MD5Utils.md5Encode(filename); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java index 6023dd5..9aa54c8 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -44,7 +44,7 @@ public class TransFormMap { } catch (RuntimeException e) { logger.error("TransForm logs failed,The exception is :" + e); return null; - } + } } diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java deleted file mode 100644 index 170086c..0000000 --- a/src/test/java/com/zdjizhi/EncryptorTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.zdjizhi; - -import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; -import org.junit.Test; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2022/3/1610:55 - */ -public class EncryptorTest { - - - @Test - public void passwordTest(){ - StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - // 配置加密解密的密码/salt值 - encryptor.setPassword("galaxy"); - // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p - String pin = "galaxy2019"; - String encPin = encryptor.encrypt(pin); - String user = "admin"; - String encUser = encryptor.encrypt(user); - System.out.println(encPin); - System.out.println(encUser); - // 再进行解密:raw_password - String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ"); - String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw=="); - - System.out.println("The username is: "+rawPwd); - System.out.println("The pin is: "+rawUser); - } - -} diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java deleted file mode 100644 index 2dd5837..0000000 --- a/src/test/java/com/zdjizhi/FunctionTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.zdjizhi; - -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.IpLookupV2; -import com.zdjizhi.utils.general.CityHash; -import org.junit.Test; - -import java.math.BigInteger; -import java.util.Calendar; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/11/611:38 - */ -public class FunctionTest { - - private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) - .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") - .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") - .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") - .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") - .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") - .build(); - - @Test - public void CityHashTest() { - - byte[] dataBytes = String.valueOf(613970406986188816L).getBytes(); - long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); - String decimalValue = Long.toUnsignedString(hashValue, 10); - BigInteger result = new BigInteger(decimalValue); - System.out.println(result); - } - - @Test - public void ipLookupTest() { - String ip = "61.144.36.144"; - System.out.println(ipLookup.cityLookupDetail(ip)); - System.out.println(ipLookup.countryLookup(ip)); - } - - @Test - public void timestampTest(){ - Calendar cal = Calendar.getInstance(); - Long utcTime=cal.getTimeInMillis(); - System.out.println(utcTime); - System.out.println(System.currentTimeMillis()); - } -} diff --git a/src/test/java/com/zdjizhi/HBaseTest.java b/src/test/java/com/zdjizhi/HBaseTest.java deleted file mode 100644 index 5f94e32..0000000 --- a/src/test/java/com/zdjizhi/HBaseTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.zdjizhi; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; - -import java.io.IOException; -import java.util.Arrays; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/12/310:42 - */ -public class HBaseTest { - - @Test - public void getColumn() { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181"); - configuration.set("hbase.client.retries.number", "3"); - configuration.set("hbase.bulkload.retries.number", "3"); - configuration.set("zookeeper.recovery.retry", "3"); - try { - Connection connection = ConnectionFactory.createConnection(configuration); - Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account")); - Scan scan2 = new Scan(); - ResultScanner scanner = table.getScanner(scan2); - for (Result result : scanner) { - int acctStatusType; - boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); - if (hasType) { - acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); - } else { - acctStatusType = 3; - } - String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); - String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); - System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account); -// System.out.println(Arrays.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); - } - } catch (IOException e) { - e.printStackTrace(); - } - } -} diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java deleted file mode 100644 index cd7ada3..0000000 --- a/src/test/java/com/zdjizhi/json/JsonPathTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.zdjizhi.json; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - * @author qidaijie - * @Package com.zdjizhi.json - * @Description: - * @date 2022/3/2410:22 - */ -public class JsonPathTest { - private static final Log logger = LogFactory.get(); - - private static Properties propNacos = new Properties(); - - /** - * 获取需要删除字段的列表 - */ - private static ArrayList dropList = new ArrayList<>(); - - /** - * 在内存中加载反射类用的map - */ - private static HashMap map; - - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList jobList; - - private static String schema; - - static { - propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); - propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); - propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); - propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); - try { - ConfigService configService = NacosFactory.createConfigService(propNacos); - String dataId = FlowWriteConfig.NACOS_DATA_ID; - String group = FlowWriteConfig.NACOS_GROUP; - String config = configService.getConfig(dataId, group, 5000); - if (StringUtil.isNotBlank(config)) { - schema = config; - } - } catch (NacosException e) { - logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); - } - } - - @Test - public void parseSchemaGetFields() { - DocumentContext parse = JsonPath.parse(schema); - List fields = parse.read("$.fields[*]"); - for (Object field : fields) { - String name = JsonPath.read(field, "$.name").toString(); - String type = JsonPath.read(field, "$.type").toString(); - } - } -} diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java deleted file mode 100644 index 52b99e5..0000000 --- a/src/test/java/com/zdjizhi/nacos/NacosTest.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.zdjizhi.nacos; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import org.junit.Test; - -import java.io.IOException; -import java.io.StringReader; -import java.util.Properties; -import java.util.concurrent.Executor; - - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2022/3/1016:58 - */ -public class NacosTest { - - /** - * - * com.alibaba.nacos - * nacos-client - * 1.2.0 - * - */ - - private static Properties properties = new Properties(); - /** - * config data id = config name - */ - private static final String DATA_ID = "test"; - /** - * config group - */ - private static final String GROUP = "Galaxy"; - - private void getProperties() { - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); - properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); - properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); - properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); - } - - - @Test - public void GetConfigurationTest() { - try { - getProperties(); - ConfigService configService = NacosFactory.createConfigService(properties); - String content = configService.getConfig(DATA_ID, GROUP, 5000); - Properties nacosConfigMap = new Properties(); - nacosConfigMap.load(new StringReader(content)); - System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); - } catch (NacosException | IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - @Test - public void ListenerConfigurationTest() { - getProperties(); - try { - //first get config - ConfigService configService = NacosFactory.createConfigService(properties); - String config = configService.getConfig(DATA_ID, GROUP, 5000); - System.out.println(config); - - //start listenner - configService.addListener(DATA_ID, GROUP, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - System.out.println(configMsg); - } - }); - } catch (NacosException e) { - e.printStackTrace(); - } - - //keep running,change nacos config,print new config - while (true) { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } -} diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java deleted file mode 100644 index c81b809..0000000 --- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.zdjizhi.nacos; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - * @author qidaijie - * @Package com.zdjizhi.nacos - * @Description: - * @date 2022/3/1714:57 - */ -public class SchemaListener { - - private static Properties properties = new Properties(); - private static ArrayList jobList; - - - static { - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); - properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); - properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); - properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); - - try { - ConfigService configService = NacosFactory.createConfigService(properties); - String dataId = "session_record.json"; - String group = "Galaxy"; - jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000)); - configService.addListener(dataId, group, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - jobList = getJobListFromHttp(configMsg); - } - }); - } catch (NacosException e) { - e.printStackTrace(); - } - } - - - @Test - public void dealCommonMessage() { - //keep running,change nacos config,print new config - while (true) { - try { - System.out.println(Arrays.toString(jobList.get(0))); - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - /** - * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) - * - * @return 任务列表 - */ - private static ArrayList getJobListFromHttp(String schema) { - ArrayList list = new ArrayList<>(); - - //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(schema); - JSONArray fields = (JSONArray) schemaJson.get("fields"); - - for (Object field : fields) { - - if (JSON.parseObject(field.toString()).containsKey("doc")) { - Object doc = JSON.parseObject(field.toString()).get("doc"); - - if (JSON.parseObject(doc.toString()).containsKey("format")) { - String name = JSON.parseObject(field.toString()).get("name").toString(); - Object format = JSON.parseObject(doc.toString()).get("format"); - JSONObject formatObject = JSON.parseObject(format.toString()); - - String functions = formatObject.get("functions").toString(); - String appendTo = null; - String params = null; - - if (formatObject.containsKey("appendTo")) { - appendTo = formatObject.get("appendTo").toString(); - } - - if (formatObject.containsKey("param")) { - params = formatObject.get("param").toString(); - } - - - if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { - String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); - - for (int i = 0; i < functionArray.length; i++) { - list.add(new String[]{name, appendToArray[i], functionArray[i], null}); - } - - } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { - String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); - - for (int i = 0; i < functionArray.length; i++) { - list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); - - } - } else { - list.add(new String[]{name, name, functions, params}); - } - - } - } - - } - return list; - } - -}