diff --git a/pom.xml b/pom.xml index 068ad0d..c0c2faa 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 220316-encryption + 220318-nacos log-completion-schema http://www.example.com @@ -37,7 +37,7 @@ 2.7.1 1.0.0 2.2.3 - 1.4.1 + 1.2.0 provided diff --git a/properties/default_config.properties b/properties/default_config.properties index cee5c76..aaeccfc 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -3,7 +3,7 @@ session.timeout.ms=60000 #kafka source poll -max.poll.records=3000 +max.poll.records=5000 #kafka source poll bytes max.partition.fetch.bytes=31457280 @@ -33,8 +33,19 @@ kafka.user=nsyGpHKGFA4KW0zro9MDdw== #kafka SASL及SSL验证密码-加密 kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ -#====================Topology Default====================# +#生产者ack +producer.ack=1 +#====================nacos default====================# +#nacos username +nacos.username=nacos + +#nacos password +nacos.pin=nacos + +#nacos group +nacos.group=Galaxy +#====================Topology Default====================# #hbase table name hbase.table.name=tsg_galaxy:relation_framedip_account @@ -45,4 +56,10 @@ mail.default.charset=UTF-8 log.transform.type=1 #两个输出之间的最大时间(单位milliseconds) -buffer.timeout=5000 \ No newline at end of file +buffer.timeout=5000 +#====================临时配置-待删除====================# +#网关APP_ID 获取接口 +app.id.http=http://192.168.44.20:9999/open-api/appDicList + +#app_id 更新时间,如填写0则不更新缓存 +app.tick.tuple.freq.secs=0 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 3640a70..7a8f907 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,68 +1,48 @@ -#--------------------------------地址配置------------------------------# +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.44.12:8848 -#管理kafka地址 -source.kafka.servers=192.168.44.12:9094 +#nacos namespace +nacos.schema.namespace=test -#管理输出kafka地址 -sink.kafka.servers=192.168.44.12:9094 +#nacos topology_common_config.properties namespace +nacos.common.namespace=flink -#zookeeper 地址 用于配置log_id -zookeeper.servers=192.168.44.12:2181 +#nacos data id +nacos.data.id=session_record.json -#hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=192.168.44.12:2181 - -#--------------------------------HTTP/定位库------------------------------# -#定位库地址 -tools.library=D:\\workerspace\\dat\\ - -#网关的schema位置 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/session_record - -#网关APP_ID 获取接口 -app.id.http=http://192.168.44.67:9999/open-api/appDicList - -#--------------------------------Kafka消费组信息------------------------------# +#--------------------------------Kafka消费/生产配置------------------------------# #kafka 接收数据topic -source.kafka.topic=test +source.kafka.topic=SESSION-RECORD #补全数据 输出 topic -sink.kafka.topic=test-result +sink.kafka.topic=SESSION-RECORD-COMPLETED #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=flinktest-1 -#生产者压缩模式 none or snappy -producer.kafka.compression.type=none - -#生产者ack -producer.ack=1 - #--------------------------------topology配置------------------------------# #consumer 并行度 -source.parallelism=1 +source.parallelism=9 #转换函数并行度 -transform.parallelism=1 +transform.parallelism=27 #kafka producer 并行度 -sink.parallelism=1 +sink.parallelism=9 -#数据中心,取值范围(0-63) +#数据中心,取值范围(0-31) data.center.id.num=0 #hbase 更新时间,如填写0则不更新缓存 hbase.tick.tuple.freq.secs=180 -#app_id 更新时间,如填写0则不更新缓存 -app.tick.tuple.freq.secs=0 - #--------------------------------默认值配置------------------------------# -#邮件默认编码 -mail.default.charset=UTF-8 - #0不需要补全原样输出日志,1需要补全 -log.need.complete=1 \ No newline at end of file +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index eab9f06..fc0c194 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -16,12 +16,37 @@ public class FlowWriteConfig { } public static final int IF_PARAM_LENGTH = 3; + /** + * 有此标识的字段为失效字段,不计入最终日志字段 + */ public static final String VISIBILITY = "disabled"; + /** + * 默认的切分符号 + */ public static final String FORMAT_SPLITTER = ","; + /** + * 标识字段为日志字段还是schema指定字段 + */ public static final String IS_JSON_KEY_TAG = "$."; + /** + * if函数连接分隔符 + */ public static final String IF_CONDITION_SPLITTER = "="; - public static final String MODEL = "remote"; - public static final String PROTOCOL_SPLITTER = "\\."; + /** + * 默认的字符串解析编码 + */ + public static final String ENCODING = "UTF8"; + + /** + * Nacos + */ + public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server"); + public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace"); + public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace"); + public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id"); + public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin"); + public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group"); + public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username"); /** * System config @@ -29,18 +54,31 @@ public class FlowWriteConfig { public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism"); public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism"); public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism"); - public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); - public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); - public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); - public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); + public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset"); public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); + /** + * HBase + */ + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); + + + /** + * kafka common + */ + public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); + public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin")); + + /** * kafka source config */ + public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms"); public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records"); public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); @@ -49,19 +87,9 @@ public class FlowWriteConfig { /** * kafka sink config */ - public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers"); - public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); - public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); - public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); - public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic"); - public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic"); - public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack"); - public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); + public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); - public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); - public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin")); - /** * connection kafka @@ -76,7 +104,17 @@ public class FlowWriteConfig { /** * http */ - public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); - public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http"); + public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs"); + + /** + * common config + */ + public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("etl.sink.kafka.servers"); + public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers"); + public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library"); + public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers"); + } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/NacosConfig.java b/src/main/java/com/zdjizhi/common/NacosConfig.java new file mode 100644 index 0000000..f71732f --- /dev/null +++ b/src/main/java/com/zdjizhi/common/NacosConfig.java @@ -0,0 +1,106 @@ +package com.zdjizhi.common; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.system.FlowWriteConfigurations; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.common + * @Description: + * @date 2022/3/189:36 + */ +public class NacosConfig { + private static final Log logger = LogFactory.get(); + private static Properties propCommon = new Properties(); + private static Properties propNacos = new Properties(); + + private static NacosConfig nacosConfig; + + private static void getInstance() { + nacosConfig = new NacosConfig(); + } + + /** + * 构造函数-新 + */ + private NacosConfig() { + //获取连接 + getConnection(); + } + + /** + * 初始化Nacos配置列表 + */ + private static void getConnection() { + try { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_COMMON_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + ConfigService configService = NacosFactory.createConfigService(propNacos); + String commonConfig = configService.getConfig("topology_common_config.properties", FlowWriteConfig.NACOS_GROUP, 5000); + if (StringUtil.isNotBlank(commonConfig)) { + propCommon.load(new StringReader(commonConfig)); + } + } catch (NacosException | IOException e) { + logger.error("Get topology run configuration error,The exception message is :" + e.getMessage()); + } + } + + /** + * 获取String类型配置 + * + * @param key config key + * @return value + */ + public static String getStringProperty(String key) { + + if (nacosConfig == null) { + getInstance(); + } + return propCommon.getProperty(key); + + } + + /** + * 获取Integer类型配置 + * + * @param key config key + * @return value + */ + public static Integer getIntegerProperty(String key) { + if (nacosConfig == null) { + getInstance(); + } + + return Integer.parseInt(propCommon.getProperty(key)); + + } + + + /** + * 获取Long类型配置 + * + * @param key config key + * @return value + */ + public static Long getLongProperty(String key) { + if (nacosConfig == null) { + getInstance(); + } + + return Long.parseLong(propCommon.getProperty(key)); + + } + +} diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 2d42769..c98687b 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -10,6 +10,7 @@ import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Map; @@ -30,8 +31,9 @@ public class LogFlowWriteTopology { environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { - DataStreamSource> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer()) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM); + + SingleOutputStreamOperator> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer()) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM).name(FlowWriteConfig.SOURCE_KAFKA_TOPIC); DataStream cleaningLog; switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) { @@ -56,7 +58,7 @@ public class LogFlowWriteTopology { .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //发送数据到Kafka - result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") + result.addSink(KafkaProducer.getKafkaProducer()).name(FlowWriteConfig.SINK_KAFKA_TOPIC) .setParallelism(FlowWriteConfig.SINK_PARALLELISM); } else { DataStreamSource streamSource = environment.addSource(KafkaConsumer.flinkConsumer()) @@ -67,7 +69,7 @@ public class LogFlowWriteTopology { .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //发送数据到Kafka - result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") + result.addSink(KafkaProducer.getKafkaProducer()).name(FlowWriteConfig.SINK_KAFKA_TOPIC) .setParallelism(FlowWriteConfig.SINK_PARALLELISM); } diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java index 168fec2..7cb907e 100644 --- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java +++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java @@ -44,7 +44,7 @@ public class SnowflakeId { private final long maxWorkerId = -1L ^ (-1L << workerIdBits); /** - * 支持的最大数据标识id,结果是127 + * 支持的最大数据标识id,结果是31 */ private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java index de4ca99..537d172 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -3,12 +3,9 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.json.JsonParseUtil; -import java.util.ArrayList; import java.util.Map; @@ -20,13 +17,6 @@ import java.util.Map; public class TransFormMap { private static final Log logger = LogFactory.get(); - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); - /** * 解析日志,并补全 * @@ -37,7 +27,7 @@ public class TransFormMap { public static String dealCommonMessage(Map jsonMap) { try { JsonParseUtil.dropJsonField(jsonMap); - for (String[] strings : jobList) { + for (String[] strings : JsonParseUtil.getJobList()) { //用到的参数的值 Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); //需要补全的字段的key @@ -52,7 +42,7 @@ public class TransFormMap { } return JsonMapper.toJsonString(jsonMap); } catch (RuntimeException e) { - logger.error("TransForm logs failed,The exception is :" + e.getMessage()); + logger.error("TransForm logs failed,The exception is :" + e); return null; } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java index 2e80908..d251e22 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -3,19 +3,11 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.serializer.SerializerFeature; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.json.JsonParseUtil; -import com.zdjizhi.utils.json.JsonTypeUtils; -import java.util.ArrayList; import java.util.Map; -import static com.alibaba.fastjson.serializer.SerializerFeature.WriteMapNullValue; - /** * 描述:转换或补全工具类 @@ -25,13 +17,6 @@ import static com.alibaba.fastjson.serializer.SerializerFeature.WriteMapNullValu public class TransFormTypeMap { private static final Log logger = LogFactory.get(); - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); - /** * 解析日志,并补全 * @@ -41,8 +26,8 @@ public class TransFormTypeMap { @SuppressWarnings("unchecked") public static String dealCommonMessage(Map message) { try { - Map jsonMap = JsonTypeUtils.typeTransform(message); - for (String[] strings : jobList) { + Map jsonMap = JsonParseUtil.typeTransform(message); + for (String[] strings : JsonParseUtil.getJobList()) { //用到的参数的值 Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); //需要补全的字段的key @@ -57,7 +42,7 @@ public class TransFormTypeMap { } return JsonMapper.toJsonString(jsonMap); } catch (RuntimeException e) { - logger.error("TransForm logs failed,The exception is :" + e.getMessage()); + logger.error("TransForm logs failed,The exception is :" + e); return null; } } @@ -137,7 +122,7 @@ public class TransFormTypeMap { break; case "app_match": if (logValue != null && appendToKeyValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); +// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); } break; default: diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index afa1bf3..d69e864 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -5,14 +5,23 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.common.NacosConfig; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.http.HttpClientUtil; +import com.zdjizhi.utils.system.FlowWriteConfigurations; import net.sf.cglib.beans.BeanGenerator; import net.sf.cglib.beans.BeanMap; import java.util.*; +import java.util.concurrent.Executor; + +import static com.zdjizhi.utils.json.JsonTypeUtils.*; /** * 使用FastJson解析json的工具类 @@ -20,10 +29,56 @@ import java.util.*; * @author qidaijie */ public class JsonParseUtil { - private static final Log logger = LogFactory.get(); + private static Properties propNacos = new Properties(); + /** + * 获取需要删除字段的列表 + */ private static ArrayList dropList = new ArrayList<>(); + /** + * 在内存中加载反射类用的map + */ + private static HashMap map; + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList; + + static { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + try { + ConfigService configService = NacosFactory.createConfigService(propNacos); + String dataId = FlowWriteConfig.NACOS_DATA_ID; + String group = FlowWriteConfig.NACOS_GROUP; + String schema = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(schema)) { + jobList = getJobListFromHttp(schema); + map = getMapFromHttp(schema); + } + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + if (StringUtil.isNotBlank(configMsg)) { + map = getMapFromHttp(configMsg); + jobList = getJobListFromHttp(configMsg); + } + } + }); + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } /** * 模式匹配,给定一个类型字符串返回一个类类型 @@ -105,22 +160,6 @@ public class JsonParseUtil { } } - /** - * 获取属性值的方法 - * - * @param jsonMap 原始日志 - * @param property key - * @return 属性的值 - */ - public static Object getValue(JSONObject jsonMap, String property) { - try { - return jsonMap.getOrDefault(property, null); - } catch (RuntimeException e) { - logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); - return null; - } - } - /** * 更新属性值的方法 * @@ -153,50 +192,60 @@ public class JsonParseUtil { } /** - * 更新属性值的方法 + * 类型转换 * - * @param jsonMap 原始日志json map - * @param property 更新的key - * @param value 更新的值 + * @param jsonMap 原始日志map */ - public static void setValue(JSONObject jsonMap, String property, Object value) { - try { - jsonMap.put(property, value); - } catch (RuntimeException e) { - logger.error("赋予实体类错误类型数据", e); + public static Map typeTransform(Map jsonMap) throws RuntimeException { + JsonParseUtil.dropJsonField(jsonMap); + HashMap tmpMap = new HashMap<>(192); + for (String key : jsonMap.keySet()) { + if (map.containsKey(key)) { + String simpleName = map.get(key).getSimpleName(); + switch (simpleName) { + case "String": + tmpMap.put(key, checkString(jsonMap.get(key))); + break; + case "Integer": + tmpMap.put(key, getIntValue(jsonMap.get(key))); + break; + case "long": + tmpMap.put(key, checkLongValue(jsonMap.get(key))); + break; + case "List": + tmpMap.put(key, checkArray(jsonMap.get(key))); + break; + case "Map": + tmpMap.put(key, checkObject(jsonMap.get(key))); + break; + case "double": + tmpMap.put(key, checkDouble(jsonMap.get(key))); + break; + default: + tmpMap.put(key, checkString(jsonMap.get(key))); + } + } } + return tmpMap; } - /** - * 根据反射生成对象的方法 - * - * @param properties 反射类用的map - * @return 生成的Object类型的对象 - */ - public static Object generateObject(Map properties) { - BeanGenerator generator = new BeanGenerator(); - Set keySet = properties.keySet(); - for (Object aKeySet : keySet) { - String key = (String) aKeySet; - generator.addProperty(key, (Class) properties.get(key)); - } - return generator.create(); + public static ArrayList getJobList() { + return jobList; } + /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + *

+ * // * @param http 网关schema地址 * - * @param http 网关schema地址 * @return 用于反射生成schema类型的对象的一个map集合 */ - public static HashMap getMapFromHttp(String http) { + public static HashMap getMapFromHttp(String schema) { HashMap map = new HashMap<>(16); - String schema = HttpClientUtil.requestByGetMethod(http); - Object data = JSON.parseObject(schema).get("data"); - //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONObject schemaJson = JSON.parseObject(schema); JSONArray fields = (JSONArray) schemaJson.get("fields"); for (Object field : fields) { @@ -239,6 +288,7 @@ public class JsonParseUtil { /** * 删除schema内指定的无效字段(jackson) + * * @param jsonMap */ public static void dropJsonField(Map jsonMap) { @@ -247,31 +297,17 @@ public class JsonParseUtil { } } - /** - * 删除schema内指定的无效字段(fastjson) - * @param jsonMap - */ - public static void dropJsonField(JSONObject jsonMap) { - for (String field : dropList) { - jsonMap.remove(field); - } - } - /** * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) * - * @param http 网关url + * @param schema 网关url * @return 任务列表 */ - public static ArrayList getJobListFromHttp(String http) { + public static ArrayList getJobListFromHttp(String schema) { ArrayList list = new ArrayList<>(); - String schema = HttpClientUtil.requestByGetMethod(http); - //解析data - Object data = JSON.parseObject(schema).get("data"); - //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONObject schemaJson = JSON.parseObject(schema); JSONArray fields = (JSONArray) schemaJson.get("fields"); for (Object field : fields) { diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java index 0b6bc1e..63af9d5 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java @@ -2,6 +2,11 @@ package com.zdjizhi.utils.json; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.exception.FlowWriteException; @@ -9,6 +14,11 @@ import com.zdjizhi.utils.exception.FlowWriteException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +import static com.zdjizhi.utils.json.JsonParseUtil.getJobListFromHttp; +import static com.zdjizhi.utils.json.JsonParseUtil.getMapFromHttp; /** * @author qidaijie @@ -17,70 +27,26 @@ import java.util.Map; * @date 2021/7/1217:34 */ public class JsonTypeUtils { - private static final Log logger = LogFactory.get(); - /** - * 在内存中加载反射类用的map - */ - private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); - - /** - * 类型转换 - * - * @param jsonMap 原始日志map - */ - public static Map typeTransform(Map jsonMap) throws RuntimeException { - JsonParseUtil.dropJsonField(jsonMap); - HashMap tmpMap = new HashMap<>(192); - for (String key : jsonMap.keySet()) { - if (map.containsKey(key)) { - String simpleName = map.get(key).getSimpleName(); - switch (simpleName) { - case "String": - tmpMap.put(key, checkString(jsonMap.get(key))); - break; - case "Integer": - tmpMap.put(key, getIntValue(jsonMap.get(key))); - break; - case "long": - tmpMap.put(key, checkLongValue(jsonMap.get(key))); - break; - case "List": - tmpMap.put(key, checkArray(jsonMap.get(key))); - break; - case "Map": - tmpMap.put(key, checkObject(jsonMap.get(key))); - break; - case "double": - tmpMap.put(key, checkDouble(jsonMap.get(key))); - break; - default: - tmpMap.put(key, checkString(jsonMap.get(key))); - } - } - } - return tmpMap; - } - /** * String 类型检验转换方法 * * @param value json value * @return String value */ - private static String checkString(Object value) { + static String checkString(Object value) { if (value == null) { return null; } - if (value instanceof Map){ + if (value instanceof Map) { return JsonMapper.toJsonString(value); } - if (value instanceof List){ + if (value instanceof List) { return JsonMapper.toJsonString(value); } - return value.toString(); + return value.toString(); } /** @@ -89,7 +55,7 @@ public class JsonTypeUtils { * @param value json value * @return List value */ - private static Map checkObject(Object value) { + static Map checkObject(Object value) { if (value == null) { return null; } @@ -107,7 +73,7 @@ public class JsonTypeUtils { * @param value json value * @return List value */ - private static List checkArray(Object value) { + static List checkArray(Object value) { if (value == null) { return null; } @@ -119,27 +85,19 @@ public class JsonTypeUtils { throw new FlowWriteException("can not cast to List, value : " + value); } - private static Long checkLong(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToLong(value); - } - /** * long 类型检验转换方法,若为空返回基础值 * * @param value json value * @return Long value */ - private static long checkLongValue(Object value) { + static long checkLongValue(Object value) { Long longVal = TypeUtils.castToLong(value); + if (longVal == null) { return 0L; } -// return longVal.longValue(); return longVal; } @@ -149,7 +107,7 @@ public class JsonTypeUtils { * @param value json value * @return Double value */ - private static Double checkDouble(Object value) { + static Double checkDouble(Object value) { if (value == null) { return null; } @@ -158,29 +116,18 @@ public class JsonTypeUtils { } - private static Integer checkInt(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToInt(value); - } - - /** * int 类型检验转换方法,若为空返回基础值 * * @param value json value * @return int value */ - private static int getIntValue(Object value) { + static int getIntValue(Object value) { Integer intVal = TypeUtils.castToInt(value); if (intVal == null) { return 0; } - -// return intVal.intValue(); return intVal; } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 078c2fe..f3d979b 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -2,8 +2,12 @@ package com.zdjizhi.utils.kafka; import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; @@ -25,9 +29,7 @@ public class KafkaConsumer { properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS); properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS); properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); - properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10"); + properties.put("partition.discovery.interval.ms", "10000"); CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties); return properties; @@ -42,7 +44,10 @@ public class KafkaConsumer { FlinkKafkaConsumer> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, new TimestampDeserializationSchema(), createConsumerConfig()); - kafkaConsumer.setCommitOffsetsOnCheckpoints(false); + //随着checkpoint提交,将offset提交到kafka + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + + //从消费组当前的offset开始消费 kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; @@ -57,7 +62,7 @@ public class KafkaConsumer { FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); - kafkaConsumer.setCommitOffsetsOnCheckpoints(false); + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index f2f399d..843028b 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -3,8 +3,6 @@ package com.zdjizhi.utils.kafka; import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.SslConfigs; import java.util.Optional; import java.util.Properties; @@ -39,12 +37,14 @@ public class KafkaProducer { FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( FlowWriteConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), - createProducerConfig(), Optional.empty()); + //sink与所有分区建立连接,轮询写入; + createProducerConfig(), + Optional.empty()); - kafkaProducer.setLogFailuresOnly(false); - -// kafkaProducer.setWriteTimestampToKafka(true); + //允许producer记录失败日志而不是捕获和抛出它们 + kafkaProducer.setLogFailuresOnly(true); return kafkaProducer; } + } diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java index e978369..920ffab 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java +++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java @@ -2,6 +2,7 @@ package com.zdjizhi.utils.kafka; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.JsonMapper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; @@ -17,7 +18,11 @@ import java.util.Map; */ public class TimestampDeserializationSchema implements KafkaDeserializationSchema { private static final Log logger = LogFactory.get(); - private final String ENCODING = "UTF8"; + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Map.class); + } @Override public boolean isEndOfStream(Object nextElement) { @@ -25,11 +30,12 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem } @Override + @SuppressWarnings("unchecked") public Map deserialize(ConsumerRecord record) throws Exception { if (record != null) { try { long timestamp = record.timestamp() / 1000; - String value = new String((byte[]) record.value(), ENCODING); + String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING); Map json = (Map) JsonMapper.fromJsonString(value, Map.class); json.put("common_ingestion_time", timestamp); return json; @@ -39,9 +45,4 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem } return null; } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(Map.class); - } } diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java index 08fa29b..d793628 100644 --- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -1,8 +1,14 @@ package com.zdjizhi.utils.system; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; import java.io.IOException; +import java.io.StringReader; import java.util.Locale; import java.util.Properties; @@ -25,7 +31,6 @@ public final class FlowWriteConfigurations { } else { return null; } - } public static Integer getIntProperty(Integer type, String key) { diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java new file mode 100644 index 0000000..170086c --- /dev/null +++ b/src/test/java/com/zdjizhi/EncryptorTest.java @@ -0,0 +1,35 @@ +package com.zdjizhi; + +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2022/3/1610:55 + */ +public class EncryptorTest { + + + @Test + public void passwordTest(){ + StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + // 配置加密解密的密码/salt值 + encryptor.setPassword("galaxy"); + // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p + String pin = "galaxy2019"; + String encPin = encryptor.encrypt(pin); + String user = "admin"; + String encUser = encryptor.encrypt(user); + System.out.println(encPin); + System.out.println(encUser); + // 再进行解密:raw_password + String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ"); + String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw=="); + + System.out.println("The username is: "+rawPwd); + System.out.println("The pin is: "+rawUser); + } + +} diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..2dd5837 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,52 @@ +package com.zdjizhi; + +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.IpLookupV2; +import com.zdjizhi.utils.general.CityHash; +import org.junit.Test; + +import java.math.BigInteger; +import java.util.Calendar; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/11/611:38 + */ +public class FunctionTest { + + private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) + .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") + .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") + .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") + .build(); + + @Test + public void CityHashTest() { + + byte[] dataBytes = String.valueOf(613970406986188816L).getBytes(); + long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); + String decimalValue = Long.toUnsignedString(hashValue, 10); + BigInteger result = new BigInteger(decimalValue); + System.out.println(result); + } + + @Test + public void ipLookupTest() { + String ip = "61.144.36.144"; + System.out.println(ipLookup.cityLookupDetail(ip)); + System.out.println(ipLookup.countryLookup(ip)); + } + + @Test + public void timestampTest(){ + Calendar cal = Calendar.getInstance(); + Long utcTime=cal.getTimeInMillis(); + System.out.println(utcTime); + System.out.println(System.currentTimeMillis()); + } +} diff --git a/src/test/java/com/zdjizhi/HBaseTest.java b/src/test/java/com/zdjizhi/HBaseTest.java new file mode 100644 index 0000000..5f94e32 --- /dev/null +++ b/src/test/java/com/zdjizhi/HBaseTest.java @@ -0,0 +1,54 @@ +package com.zdjizhi; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/12/310:42 + */ +public class HBaseTest { + + @Test + public void getColumn() { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181"); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + try { + Connection connection = ConnectionFactory.createConnection(configuration); + Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account")); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + int acctStatusType; + boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + if (hasType) { + acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + } else { + acctStatusType = 3; + } + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account); +// System.out.println(Arrays.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/com/zdjizhi/KafkaTest.java b/src/test/java/com/zdjizhi/KafkaTest.java deleted file mode 100644 index 4b034a3..0000000 --- a/src/test/java/com/zdjizhi/KafkaTest.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.zdjizhi; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.kafka.clients.producer.*; -import org.apache.kafka.common.config.SslConfigs; - -import java.util.Properties; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/8/217:39 - */ -public class KafkaTest { - private static final Log logger = LogFactory.get(); - - public static void main(String[] args) { - Properties properties = new Properties(); - properties.put("bootstrap.servers", "192.168.44.12:9091"); - properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - properties.put("acks", "1"); -// properties.put("retries", DefaultProConfig.RETRIES); -// properties.put("linger.ms", DefaultProConfig.LINGER_MS); -// properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); -// properties.put("batch.size", DefaultProConfig.BATCH_SIZE); -// properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); -// properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); - - properties.put("security.protocol", "SSL"); -// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks"); - properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\keystore.jks"); - properties.put("ssl.keystore.password", "galaxy2019"); -// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks"); - properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\truststore.jks"); - properties.put("ssl.truststore.password", "galaxy2019"); - properties.put("ssl.key.password", "galaxy2019"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - - Producer producer = new KafkaProducer(properties); - - producer.send(new ProducerRecord<>("test", "hello!"), new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - logger.error("写入test出现异常", exception); - } - } - }); - - producer.close(); - } -} diff --git a/src/test/java/com/zdjizhi/LocationTest.java b/src/test/java/com/zdjizhi/LocationTest.java deleted file mode 100644 index e7b2d15..0000000 --- a/src/test/java/com/zdjizhi/LocationTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.zdjizhi; - -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.IpLookup; -import org.junit.Test; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/8/1811:34 - */ -public class LocationTest { - private static IpLookup ipLookup = new IpLookup.Builder(false) - .loadDataFileV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v4.mmdb") - .loadDataFileV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v6.mmdb") - .loadDataFilePrivateV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v4.mmdb") - .loadDataFilePrivateV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v6.mmdb") - .build(); - - @Test - public void IpLocationTest() { - System.out.println(ipLookup.cityLookupDetail("24.241.112.0")); - System.out.println(ipLookup.cityLookupDetail("1.1.1.1")); - System.out.println(ipLookup.cityLookupDetail("192.168.50.58")); - System.out.println(ipLookup.cityLookupDetail("2600:1700:9010::")); - } -} diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java new file mode 100644 index 0000000..c2b6267 --- /dev/null +++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java @@ -0,0 +1,86 @@ +package com.zdjizhi.nacos; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.system.FlowWriteConfigurations; +import org.junit.Test; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; +import java.util.concurrent.Executor; + + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2022/3/1016:58 + */ +public class NacosTest { + private static Properties properties = new Properties(); + + @Test + public void getProperties() { + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + } + + + @Test + public void GetConfigurationTest() { + try { + getProperties(); + ConfigService configService = NacosFactory.createConfigService(properties); + String content = configService.getConfig("topology_common_config.properties", "Galaxy", 5000); + Properties nacosConfigMap = new Properties(); + nacosConfigMap.load(new StringReader(content)); + System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); + System.out.println(nacosConfigMap.getProperty("schema.http")); + } catch (NacosException | IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + @Test + public void ListenerConfigurationTest() { + getProperties(); + ConfigService configService = null; + try { + configService = NacosFactory.createConfigService(properties); + String content = configService.getConfig("ETL-SESSION-RECORD-COMPLETED", "etl", 5000); + + Properties nacosConfigMap = new Properties(); + nacosConfigMap.load(new StringReader(content)); + System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); + + configService.addListener("ETL-SESSION-RECORD-COMPLETED", "etl", new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + try { + Properties nacosConfigMap = new Properties(); + nacosConfigMap.load(new StringReader(configMsg)); + System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } catch (NacosException | IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java new file mode 100644 index 0000000..b64d9eb --- /dev/null +++ b/src/test/java/com/zdjizhi/nacos/SchemaListener.java @@ -0,0 +1,138 @@ +package com.zdjizhi.nacos; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.JsonTypeUtils; + +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author qidaijie + * @Package com.zdjizhi.nacos + * @Description: + * @date 2022/3/1714:57 + */ +public class SchemaListener { + + private static Properties properties = new Properties(); + private static ArrayList jobList; + + + static { + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + + try { + ConfigService configService = NacosFactory.createConfigService(properties); + String dataId = "session_record.json"; + String group = "Galaxy"; + jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000)); + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + jobList = getJobListFromHttp(configMsg); + } + }); + } catch (NacosException e) { + e.printStackTrace(); + } + } + + + @SuppressWarnings("unchecked") + public static void dealCommonMessage() { + + System.out.println(Arrays.toString(jobList.get(0))); + + } + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @return 任务列表 + */ + public static ArrayList getJobListFromHttp(String schema) { + ArrayList list = new ArrayList<>(); + + //解析data +// Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(schema); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + + if (JSON.parseObject(field.toString()).containsKey("doc")) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + + if (JSON.parseObject(doc.toString()).containsKey("format")) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + Object format = JSON.parseObject(doc.toString()).get("format"); + JSONObject formatObject = JSON.parseObject(format.toString()); + + String functions = formatObject.get("functions").toString(); + String appendTo = null; + String params = null; + + if (formatObject.containsKey("appendTo")) { + appendTo = formatObject.get("appendTo").toString(); + } + + if (formatObject.containsKey("param")) { + params = formatObject.get("param").toString(); + } + + + if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], null}); + } + + } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); + + } + } else { + list.add(new String[]{name, name, functions, params}); + } + + } + } + + } + return list; + } + +}