diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..365e1c6 --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,65 @@ +#====================Kafka KafkaConsumer====================# +#kafka source connection timeout +session.timeout.ms=60000 + +#kafka source poll +max.poll.records=5000 + +#kafka source poll bytes +max.partition.fetch.bytes=31457280 +#====================Kafka KafkaProducer====================# +#producer重试的次数设置 +retries=0 + +#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 +linger.ms=10 + +#如果在超时之前未收到响应,客户端将在必要时重新发送请求 +request.timeout.ms=30000 + +#producer都是按照batch进行发送的,批次大小,默认:16384 +batch.size=262144 + +#Producer端用于缓存消息的缓冲区大小 +#128M +buffer.memory=134217728 + +#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 +#10M +max.request.size=10485760 +#====================kafka default====================# +#kafka SASL验证用户名-加密 +kafka.user=nsyGpHKGFA4KW0zro9MDdw== + +#kafka SASL及SSL验证密码-加密 +kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ + +#生产者ack +producer.ack=1 +#====================nacos default====================# +#nacos username +nacos.username=nacos + +#nacos password +nacos.pin=nacos + +#nacos group +nacos.group=Galaxy +#====================Topology Default====================# +#hbase table name +hbase.table.name=tsg_galaxy:relation_framedip_account + +#邮件默认编码 +mail.default.charset=UTF-8 + +#0不做任何校验,1弱类型校验 +log.transform.type=1 + +#两个输出之间的最大时间(单位milliseconds) +buffer.timeout=5000 +#====================临时配置-待删除====================# +#网关APP_ID 获取接口 +app.id.http=http://192.168.44.67:9999/open-api/appDicList + +#app_id 更新时间,如填写0则不更新缓存 +app.tick.tuple.freq.secs=0 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..241d28c --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,77 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=10.3.60.3:9094 + +#百分点输出kafka地址 +percent.sink.kafka.servers=10.3.45.126:6667,10.3.45.127:6667,10.3.45.128:6667 + +#文件源数据topic输出kafka地址 +file.data.sink.kafka.servers=10.3.60.3:9094 + +#zookeeper 地址 用于配置log_id +zookeeper.servers=10.3.60.3:2181 + +#hbase zookeeper地址 用于连接HBase +hbase.zookeeper.servers=10.3.60.3:2181 + +#--------------------------------HTTP/定位库------------------------------# +#定位库地址 +tools.library=/opt/dat/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=10.3.60.3:8848 + +#nacos namespace +nacos.schema.namespace=prod + +#nacos topology_common_config.properties namespace +nacos.common.namespace=prod + +#nacos data id +nacos.data.id=session_record.json + +#------------------------------------OOS配置------------------------------------# +#oos地址 +oos.servers=10.3.45.100:8057 + +#--------------------------------Kafka消费/生产配置------------------------------# + +#kafka 接收数据topic +source.kafka.topic=SESSION-RECORD + +#百分点对应的topic +percent.kafka.topic=SESSION-RECORD + +#文件源数据topic +file.data.kafka.topic=test-file-data + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=flinktest-1 + +#--------------------------------topology配置------------------------------# + +#consumer 并行度 +source.parallelism=1 + +#转换函数并行度 +transform.parallelism=1 + +#percent producer 并行度 +percent.sink.parallelism=1 + +#filedata producer 并行度 +file.data.sink.parallelism=1 +#数据中心,取值范围(0-31) +data.center.id.num=0 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java new file mode 100644 index 0000000..fdada73 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -0,0 +1,134 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.FlowWriteConfigurations; +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; + +/** + * @author Administrator + */ +public class FlowWriteConfig { + + private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + + static { + encryptor.setPassword("galaxy"); + } + + public static final int IF_PARAM_LENGTH = 3; + /** + * 有此标识的字段为失效字段,不计入最终日志字段 + */ + public static final String VISIBILITY = "disabled"; + /** + * 默认的切分符号 + */ + public static final String FORMAT_SPLITTER = ","; + /** + * 标识字段为日志字段还是schema指定字段 + */ + public static final String IS_JSON_KEY_TAG = "$."; + /** + * if函数连接分隔符 + */ + public static final String IF_CONDITION_SPLITTER = "="; + /** + * 默认的字符串解析编码 + */ + public static final String ENCODING = "UTF8"; + + /** + * Nacos + */ + public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server"); + public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace"); + public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace"); + public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id"); + public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin"); + public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group"); + public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username"); + + /** + * System config + */ + public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism"); + public static final Integer PERCENT_SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "percent.sink.parallelism"); + public static final Integer FILE_DATA_SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "file.data.sink.parallelism"); + public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism"); + public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); + public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); + public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset"); + public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); + public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); + + /** + * HBase + */ + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); + + /** + * kafka common + */ + public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); + public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin")); + + public static final String PERCENT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "percent.kafka.topic"); + /** + * kafka source config + */ + public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); + public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + + /** + * kafka sink config + */ + + + public static final String FILE_DATA_SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "file.data.kafka.topic"); + public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic"); + + + /** + * connection kafka + */ + public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); + + /** + * http + */ + public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs"); + + /** + * common config + */ + /** + * public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers"); + * public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("sink.kafka.servers"); + * public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers"); + * public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library"); + * public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers"); + */ + public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers"); + public static final String PERCENT_SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"percent.sink.kafka.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers"); + public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers"); + + public static final String FILE_DATA_SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"file.data.sink.kafka.servers"); + /** + * oos + */ + public static final String OOS_SERVERS = FlowWriteConfigurations.getStringProperty(0, "oos.servers"); +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/NacosConfig.java b/src/main/java/com/zdjizhi/common/NacosConfig.java new file mode 100644 index 0000000..08bb92a --- /dev/null +++ b/src/main/java/com/zdjizhi/common/NacosConfig.java @@ -0,0 +1,107 @@ +package com.zdjizhi.common; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.system.FlowWriteConfigurations; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.common + * @Description: + * @date 2022/3/189:36 + */ +@Deprecated +public class NacosConfig { + private static final Log logger = LogFactory.get(); + private static Properties propCommon = new Properties(); + private static Properties propNacos = new Properties(); + + private static NacosConfig nacosConfig; + + private static void getInstance() { + nacosConfig = new NacosConfig(); + } + + /** + * 构造函数-新 + */ + private NacosConfig() { + //获取连接 + getConnection(); + } + + /** + * 初始化Nacos配置列表 + */ + private static void getConnection() { + try { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_COMMON_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + ConfigService configService = NacosFactory.createConfigService(propNacos); + String commonConfig = configService.getConfig("etl_connection_config.properties", FlowWriteConfig.NACOS_GROUP, 5000); + if (StringUtil.isNotBlank(commonConfig)) { + propCommon.load(new StringReader(commonConfig)); + } + } catch (NacosException | IOException e) { + logger.error("Get topology run configuration error,The exception message is :" + e.getMessage()); + } + } + + /** + * 获取String类型配置 + * + * @param key config key + * @return value + */ + public static String getStringProperty(String key) { + + if (nacosConfig == null) { + getInstance(); + } + return propCommon.getProperty(key); + + } + + /** + * 获取Integer类型配置 + * + * @param key config key + * @return value + */ + public static Integer getIntegerProperty(String key) { + if (nacosConfig == null) { + getInstance(); + } + + return Integer.parseInt(propCommon.getProperty(key)); + + } + + + /** + * 获取Long类型配置 + * + * @param key config key + * @return value + */ + public static Long getLongProperty(String key) { + if (nacosConfig == null) { + getInstance(); + } + + return Long.parseLong(propCommon.getProperty(key)); + + } + +} diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..bbb190e --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -0,0 +1,74 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.functions.DealFileProcessFunction; +import com.zdjizhi.utils.functions.FilterNullFunction; +import com.zdjizhi.utils.functions.MapCompletedFunction; +import com.zdjizhi.utils.functions.TypeMapCompletedFunction; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Map; + +/** + * @author 王成成 + * @Package com.zdjizhi.topology + * @Description: + * @date 2022.06.01 + */ +public class LogFlowWriteTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + //两个输出之间的最大时间 (单位milliseconds) + environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); + + if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { + + SingleOutputStreamOperator> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer()) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM).name(FlowWriteConfig.SOURCE_KAFKA_TOPIC); + + DataStream> cleaningLog; + switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) { + case 0: + //对原始日志进行处理补全转换等,不对日志字段类型做校验。 + cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; + case 1: + //对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。 + cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction") + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; + default: + //对原始日志进行处理补全转换等,不对日志字段类型做校验。 + cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + } + //处理带有非结构化日志的数据 + SingleOutputStreamOperator process = cleaningLog.process(new DealFileProcessFunction()); + SingleOutputStreamOperator resultFileMetaData = process.getSideOutput(DealFileProcessFunction.metaToKafa).filter(new FilterNullFunction()).name("FilterAbnormalTrafficFileMetaData").setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + SingleOutputStreamOperator result = process.filter(new FilterNullFunction()).name("FilterAbnormalData").setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + //文件元数据发送至TRAFFIC-FILE-METADATA + resultFileMetaData.addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta") + .setParallelism(FlowWriteConfig.FILE_DATA_SINK_PARALLELISM); + //补全后的数据发送给百分点的kafka + result.addSink(KafkaProducer.getPercentKafkaProducer()).name("toPercentKafka") + .setParallelism(FlowWriteConfig.PERCENT_SINK_PARALLELISM); + } + try { + environment.execute(args[0]); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + e.printStackTrace(); + } + + } + +} diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java new file mode 100644 index 0000000..1425ce9 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java @@ -0,0 +1,124 @@ +package com.zdjizhi.utils.app; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.http.HttpClientUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * AppId 工具类 + * + * @author qidaijie + */ + +@Deprecated +public class AppUtils { + private static final Log logger = LogFactory.get(); + private static Map appIdMap = new ConcurrentHashMap<>(128); + private static AppUtils appUtils; + + private static void getAppInstance() { + appUtils = new AppUtils(); + } + + + /** + * 构造函数-新 + */ + private AppUtils() { + //定时更新 + updateAppIdCache(); + } + + /** + * 更新变量 + */ + private static void change() { + if (appUtils == null) { + getAppInstance(); + } + timestampsFilter(); + } + + + /** + * 获取变更内容 + */ + private static void timestampsFilter() { + try { + Long begin = System.currentTimeMillis(); + String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP); + if (StringUtil.isNotBlank(schema)) { + String data = JSONObject.parseObject(schema).getString("data"); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + int key = jsonArray.getInteger(0); + String value = jsonArray.getString(1); + if (appIdMap.containsKey(key)) { + if (!value.equals(appIdMap.get(key))) { + appIdMap.put(key, value); + } + } else { + appIdMap.put(key, value); + } + } + logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis())); + logger.warn("Pull the length of the interface data:[" + objects.size() + "]"); + } + } catch (RuntimeException e) { + logger.error("Update cache app-id failed, exception:" + e); + } + } + + + /** + * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie + */ + private void updateAppIdCache() { + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } + } catch (RuntimeException e) { + logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + + /** + * 获取 appName + * + * @param appId app_id + * @return account + */ + public static String getAppName(int appId) { + + if (appUtils == null) { + getAppInstance(); + } + + if (appIdMap.containsKey(appId)) { + return appIdMap.get(appId); + } else { + logger.warn("AppMap get appName is null, ID is :" + appId); + return ""; + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java new file mode 100644 index 0000000..67c88f0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class FlowWriteException extends RuntimeException { + + public FlowWriteException() { + } + + public FlowWriteException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..de507ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterNullFunction implements FilterFunction { + @Override + public boolean filter(String message) { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java new file mode 100644 index 0000000..ec811f5 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -0,0 +1,23 @@ +package com.zdjizhi.utils.functions; + + +import com.zdjizhi.utils.general.TransFormMap; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapCompletedFunction implements MapFunction, Map> { + + @Override + @SuppressWarnings("unchecked") + public Map map(Map logs) { + return TransFormMap.dealCommonMessage(logs); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java new file mode 100644 index 0000000..61a6a46 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java @@ -0,0 +1,23 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.general.TransFormTypeMap; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class TypeMapCompletedFunction implements MapFunction, Map> { + + @Override + @SuppressWarnings("unchecked") + public Map map(Map logs) { + + return TransFormTypeMap.dealCommonMessage(logs); + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/CityHash.java b/src/main/java/com/zdjizhi/utils/general/CityHash.java new file mode 100644 index 0000000..5de4785 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/CityHash.java @@ -0,0 +1,180 @@ +package com.zdjizhi.utils.general; + + + + +/** + * CityHash64算法对logid进行散列计算 + * 版本规划暂不实现-TSG22.01 + * + * @author qidaijie + */ +@Deprecated +public class CityHash { + + private static final long k0 = 0xc3a5c85c97cb3127L; + private static final long k1 = 0xb492b66fbe98f273L; + private static final long k2 = 0x9ae16a3b2f90404fL; + private static final long k3 = 0xc949d7c7509e6557L; + private static final long k5 = 0x9ddfea08eb382d69L; + + private CityHash() {} + + public static long CityHash64(byte[] s, int index, int len) { + if (len <= 16 ) { + return HashLen0to16(s, index, len); + } else if (len > 16 && len <= 32) { + return HashLen17to32(s, index, len); + } else if (len > 32 && len <= 64) { + return HashLen33to64(s, index, len); + } else { + long x = Fetch64(s, index); + long y = Fetch64(s, index + len - 16) ^ k1; + long z = Fetch64(s, index + len - 56) ^ k0; + long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y); + long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0); + z += ShiftMix(v[1]) * k1; + x = Rotate(z + x, 39) * k1; + y = Rotate(y, 33) * k1; + + len = (len - 1) & ~63; + do { + x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1; + y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1; + x ^= w[1]; + y ^= v[0]; + z = Rotate(z ^ w[0], 33); + v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]); + w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y); + long t = z; + z = x; + x = t; + index += 64; + len -= 64; + } while (len != 0); + return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z, + HashLen16(v[1], w[1]) + x); + } + } + + private static long HashLen0to16(byte[] s, int index, int len) { + if (len > 8) { + long a = Fetch64(s, index); + long b = Fetch64(s, index + len - 8); + return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b; + } + if (len >= 4) { + long a = Fetch32(s, index); + return HashLen16(len + (a << 3), Fetch32(s, index + len - 4)); + } + if (len > 0) { + byte a = s[index]; + byte b = s[index + len >>> 1]; + byte c = s[index + len - 1]; + int y = (a) + (b << 8); + int z = len + (c << 2); + return ShiftMix(y * k2 ^ z * k3) * k2; + } + return k2; + } + + private static long HashLen17to32(byte[] s, int index, int len) { + long a = Fetch64(s, index) * k1; + long b = Fetch64(s, index + 8); + long c = Fetch64(s, index + len - 8) * k2; + long d = Fetch64(s, index + len - 16) * k0; + return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d, + a + Rotate(b ^ k3, 20) - c + len); + } + + private static long HashLen33to64(byte[] s, int index, int len) { + long z = Fetch64(s, index + 24); + long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0; + long b = Rotate(a + z, 52); + long c = Rotate(a, 37); + a += Fetch64(s, index + 8); + c += Rotate(a, 7); + a += Fetch64(s, index + 16); + long vf = a + z; + long vs = b + Rotate(a, 31) + c; + a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32); + z = Fetch64(s, index + len - 8); + b = Rotate(a + z, 52); + c = Rotate(a, 37); + a += Fetch64(s, index + len - 24); + c += Rotate(a, 7); + a += Fetch64(s, index + len - 16); + long wf = a + z; + long ws = b + Rotate(a, 31) + c; + long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0); + return ShiftMix(r * k0 + vs) * k2; + } + + private static long Fetch64(byte[] p, int index) { + return toLongLE(p,index); + } + + private static long Fetch32(byte[] p, int index) { + return toIntLE(p,index); + } + private static long[] WeakHashLen32WithSeeds( + long w, long x, long y, long z, long a, long b) { + a += w; + b = Rotate(b + a + z, 21); + long c = a; + a += x; + a += y; + b += Rotate(a, 44); + return new long[]{a + z, b + c}; + } + + private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) { + return WeakHashLen32WithSeeds(Fetch64(s, index), + Fetch64(s, index + 8), + Fetch64(s, index + 16), + Fetch64(s, index + 24), + a, + b); + } + + private static long toLongLE(byte[] b, int i) { + return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0)); + } + + private static long toIntLE(byte[] b, int i) { + return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0)); + } + private static long RotateByAtLeastOne(long val, int shift) { + return (val >>> shift) | (val << (64 - shift)); + } + + private static long ShiftMix(long val) { + return val ^ (val >>> 47); + } + + private static long Uint128Low64(long[] x) { + return x[0]; + } + + private static long Rotate(long val, int shift) { + return shift == 0 ? val : (val >>> shift) | (val << (64 - shift)); + } + + private static long Uint128High64(long[] x) { + return x[1]; + } + + private static long Hash128to64(long[] x) { + long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5; + a ^= (a >>> 47); + long b = (Uint128High64(x) ^ a) * k5; + b ^= (b >>> 47); + b *= k5; + return b; + } + + private static long HashLen16(long u, long v) { + return Hash128to64(new long[]{u,v}); + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java new file mode 100644 index 0000000..7cb907e --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java @@ -0,0 +1,213 @@ +package com.zdjizhi.utils.general; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.zookeeper.DistributedLock; +import com.zdjizhi.utils.zookeeper.ZookeeperUtils; + +/** + * 雪花算法 + * + * @author qidaijie + */ +public class SnowflakeId { + private static final Log logger = LogFactory.get(); + + /** + * 共64位 第一位为符号位 默认0 + * 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63), + * workerId(关联进程):7(0-127) ,序列号:11位(2047/ms) + * + * 序列号 /ms = (-1L ^ (-1L << 11)) + * 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365) + */ + /** + * 开始时间截 (2020-11-14 00:00:00) max 17years + */ + private final long twepoch = 1605283200000L; + + /** + * 机器id所占的位数 + */ + private final long workerIdBits = 8L; + + /** + * 数据标识id所占的位数 + */ + private final long dataCenterIdBits = 5L; + + /** + * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) + * M << n = M * 2^n + */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** + * 支持的最大数据标识id,结果是31 + */ + private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); + + /** + * 序列在id中占的位数 + */ + private final long sequenceBits = 11L; + + /** + * 机器ID向左移12位 + */ + private final long workerIdShift = sequenceBits; + + /** + * 数据标识id向左移17位(14+6) + */ + private final long dataCenterIdShift = sequenceBits + workerIdBits; + + /** + * 时间截向左移22位(4+6+14) + */ + private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; + + /** + * 生成序列的掩码,这里为2047 + */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** + * 工作机器ID(0~255) + */ + private long workerId; + + /** + * 数据中心ID(0~31) + */ + private long dataCenterId; + + /** + * 毫秒内序列(0~2047) + */ + private long sequence = 0L; + + /** + * 上次生成ID的时间截 + */ + private long lastTimestamp = -1L; + + + /** + * 设置允许时间回拨的最大限制10s + */ + private static final long rollBackTime = 10000L; + + + private static SnowflakeId idWorker; + + private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + + static { + idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM); + } + + //==============================Constructors===================================== + + /** + * 构造函数 + */ + private SnowflakeId(String zookeeperIp, long dataCenterIdNum) { + DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + try { + lock.lock(); + int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp); + if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); + } + this.workerId = tmpWorkerId; + this.dataCenterId = dataCenterIdNum; + } catch (RuntimeException e) { + logger.error("This is not usual error!!!===>>>" + e + "<<<==="); + }finally { + lock.unlock(); + } + } + + // ==============================Methods========================================== + + /** + * 获得下一个ID (该方法是线程安全的) + * + * @return SnowflakeId + */ + private synchronized long nextId() { + long timestamp = timeGen(); + //设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准 + if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) { + timestamp = tilNextMillis(lastTimestamp); + } + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + //上次生成ID的时间截 + lastTimestamp = timestamp; + + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) + | (dataCenterId << dataCenterIdShift) + | (workerId << workerIdShift) + | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * + * @param lastTimestamp 上次生成ID的时间截 + * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间 + * + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + + + /** + * 静态工具类 + * + * @return + */ + public static Long generateId() { + return idWorker.nextId(); + } + + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java new file mode 100644 index 0000000..6023dd5 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -0,0 +1,130 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.Map; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormMap { + private static final Log logger = LogFactory.get(); + + /** + * 解析日志,并补全 + * + * @param jsonMap kafka Topic消费原始日志并解析 + * @return 补全后的日志 + */ + @SuppressWarnings("unchecked") + public static Map dealCommonMessage(Map jsonMap) { + try { + JsonParseUtil.dropJsonField(jsonMap); + for (String[] strings : JsonParseUtil.getJobList()) { + //用到的参数的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param); + } + return jsonMap; + } catch (RuntimeException e) { + logger.error("TransForm logs failed,The exception is :" + e); + return null; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param jsonMap 原始日志解析map + * @param appendToKeyName 需要补全的字段的key + * @param appendTo 需要补全的字段的值 + * @param logValue 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); + } + break; + case "geo_asn": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); + } + break; + case "geo_ip_country": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); + } + break; + case "set_value": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendTo == null && logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); + } + break; + case "radius_match": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString())); + } + break; + case "decode_of_base64": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (logValue != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); + } + break; + case "app_match": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java new file mode 100644 index 0000000..95f6a89 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -0,0 +1,132 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.Map; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormTypeMap { + private static final Log logger = LogFactory.get(); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + @SuppressWarnings("unchecked") + public static Map dealCommonMessage(Map message) { + try { + Map jsonMap = JsonParseUtil.typeTransform(message); + for (String[] strings : JsonParseUtil.getJobList()) { + //用到的参数的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); + } + return jsonMap; + } catch (RuntimeException e) { + logger.error("TransForm logs failed,The exception is :" + e); + return null; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param jsonMap 原始日志解析map + * @param appendToKeyName 需要补全的字段的key + * @param appendToKeyValue 需要补全的字段的值 + * @param logValue 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) { + switch (function) { + case "current_timestamp": + if (!(appendToKeyValue instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + //版本规划暂不实现TSG-22.01 +// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId())); + break; + case "geo_ip_detail": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); + } + break; + case "geo_asn": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); + } + break; + case "geo_ip_country": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); + } + break; + case "set_value": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendToKeyValue == null && logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); + } + break; + case "radius_match": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString())); + } + break; + case "decode_of_base64": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (logValue != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); + } + break; + case "app_match": + if (logValue != null && appendToKeyValue == null) { +// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java new file mode 100644 index 0000000..bc9e893 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -0,0 +1,297 @@ +package com.zdjizhi.utils.general; + +import cn.hutool.core.codec.Base64; +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.FormatUtils; +import com.zdjizhi.utils.IpLookupV2; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.app.AppUtils; +import com.zdjizhi.utils.hbase.HBaseUtils; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.TypeUtils; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author qidaijie + */ +class TransFunction { + private static final Log logger = LogFactory.get(); + + /** + * 校验数字正则 + */ + private static final Pattern PATTERN = Pattern.compile("[0-9]*"); + + /** + * IP定位库工具类 + */ + private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) + .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") + .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") + .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") + .build(); + + /** + * 生成当前时间戳的操作 + */ + static long getCurrentTime() { + + return System.currentTimeMillis() / 1000; + } + + /** + * CityHash64算法 + * 版本规划暂不实现-TSG22.01 + * + * @param data 原始数据 + * @return 散列结果 + */ + @Deprecated + static BigInteger getDecimalHash(long data) { + byte[] dataBytes = String.valueOf(data).getBytes(); + long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); + String decimalValue = Long.toUnsignedString(hashValue, 10); + BigInteger result = new BigInteger(decimalValue); + return result; + } + + /** + * 根据clientIp获取location信息 + * + * @param ip client IP + * @return ip地址详细信息 + */ + static String getGeoIpDetail(String ip) { + return ipLookup.cityLookupDetail(ip); + } + + /** + * 根据ip获取asn信息 + * + * @param ip client/server IP + * @return ASN + */ + static String getGeoAsn(String ip) { + return ipLookup.asnLookup(ip); + } + + /** + * 根据ip获取country信息 + * + * @param ip server IP + * @return 国家 + */ + static String getGeoIpCountry(String ip) { + + return ipLookup.countryLookup(ip); + } + + + /** + * radius借助HBase补齐 + * + * @param ip client IP + * @return account + */ + static String radiusMatch(String ip) { + return HBaseUtils.getAccount(ip.trim()); + } + + /** + * appId与缓存中对应关系补全appName + * + * @param appIds app id 列表 + * @return appName + */ + @Deprecated + static String appMatch(String appIds) { + try { + String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); + return AppUtils.getAppName(Integer.parseInt(appId)); + } catch (NumberFormatException | ClassCastException exception) { + logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds); + return ""; + } + } + + /** + * 解析顶级域名 + * + * @param domain 初始域名 + * @return 顶级域名 + */ + static String getTopDomain(String domain) { + try { + return FormatUtils.getTopPrivateDomain(domain); + } catch (StringIndexOutOfBoundsException outException) { + logger.error("解析顶级域名异常,异常域名:" + domain); + return ""; + } + } + + /** + * 根据编码解码base64 + * + * @param message base64 + * @param charset 编码 + * @return 解码字符串 + */ + static String decodeBase64(String message, Object charset) { + String result = ""; + try { + if (StringUtil.isNotBlank(message)) { + if (charset == null) { + result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET); + } else { + result = Base64.decodeStr(message, charset.toString()); + } + } + } catch (RuntimeException rune) { + logger.error("解析 Base64 异常,异常信息:" + rune); + } + return result; + } + + /** + * 根据表达式解析json + * + * @param message json + * @param expr 解析表达式 + * @return 解析结果 + */ + static String flattenSpec(String message, String expr) { + String flattenResult = ""; + try { + if (StringUtil.isNotBlank(expr)) { + ArrayList read = JsonPath.parse(message).read(expr); + if (read.size() >= 1) { + flattenResult = read.get(0); + } + } + } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) { + logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e); + } + return flattenResult; + } + + + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param object 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + static Object isJsonValue(Object object, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(object, param.substring(2)); + } else { + return param; + } + } + + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param jsonMap 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + static Object isJsonValue(Map jsonMap, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(jsonMap, param.substring(2)); + } else { + return param; + } + } + + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param object 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + static Object condition(Object object, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(object, norms[0]); + Object resultA = isJsonValue(object, split[1]); + Object resultB = isJsonValue(object, split[2]); + if (direction instanceof Number) { + result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); + } else if (direction instanceof String) { + result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); + } + } + } catch (RuntimeException e) { + logger.error("IF 函数执行异常,异常信息:" + e); + } + return result; + } + + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param jsonMap 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + static Object condition(Map jsonMap, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(jsonMap, norms[0]); + Object resultA = isJsonValue(jsonMap, split[1]); + Object resultB = isJsonValue(jsonMap, split[2]); + if (direction instanceof Number) { + result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; + } else if (direction instanceof String) { + result = direction.equals(norms[1]) ? resultA : resultB; + } + } + } catch (RuntimeException e) { + logger.error("IF 函数执行异常,异常信息:" + e); + } + return result; + } + + + /** + * 设置固定值函数 若为数字则转为long返回 + * + * @param param 默认值 + * @return 返回数字或字符串 + */ + static Object setValue(String param) { + try { + Matcher isNum = PATTERN.matcher(param); + if (isNum.matches()) { + return Long.parseLong(param); + } else { + return param; + } + } catch (RuntimeException e) { + logger.error("SetValue 函数异常,异常信息:" + e); + } + return null; + } +} diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java new file mode 100644 index 0000000..6aa904f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -0,0 +1,208 @@ +package com.zdjizhi.utils.hbase; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * HBase 工具类 + * + * @author qidaijie + */ + +public class HBaseUtils { + private static final Log logger = LogFactory.get(); + private static Map subIdMap = new ConcurrentHashMap<>(16); + private static Connection connection; + private static Long time; + + private static HBaseUtils hBaseUtils; + + private static void getInstance() { + hBaseUtils = new HBaseUtils(); + } + + + /** + * 构造函数-新 + */ + private HBaseUtils() { + //获取连接 + getConnection(); + //拉取所有 + getAll(); + //定时更新 + updateCache(); + } + + private static void getConnection() { + try { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + connection = ConnectionFactory.createConnection(configuration); + time = System.currentTimeMillis(); + logger.warn("HBaseUtils get HBase connection,now to getAll()."); + } catch (IOException ioe) { + logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); + } + } + + /** + * 更新变量 + */ + private static void change() { + if (hBaseUtils == null) { + getInstance(); + } + long nowTime = System.currentTimeMillis(); + timestampsFilter(time - 1000, nowTime + 500); + } + + + /** + * 获取变更内容 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + */ + private static void timestampsFilter(Long startTime, Long endTime) { + Long begin = System.currentTimeMillis(); + Table table = null; + ResultScanner scanner = null; + Scan scan2 = new Scan(); + try { + table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); + scan2.setTimeRange(startTime, endTime); + scanner = table.getScanner(scan2); + for (Result result : scanner) { + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim(); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim(); + if (acctStatusType == 1) { + if (subIdMap.containsKey(framedIp)) { + boolean same = account.equals(subIdMap.get(framedIp)); + if (!same) { + subIdMap.put(framedIp, account); + } + } else { + subIdMap.put(framedIp, account); + } + } else if (acctStatusType == 2) { + subIdMap.remove(framedIp); + } + } + Long end = System.currentTimeMillis(); + logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size()); + logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime); + time = endTime; + } catch (IOException ioe) { + logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<==="); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + logger.error("HBase Table Close ERROR! Exception message is:" + e); + } + } + } + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + long begin = System.currentTimeMillis(); + try { + Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + if (acctStatusType == 1) { + subIdMap.put(framedIp, account); + } + } + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin)); + scanner.close(); + } catch (IOException ioe) { + logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); + } + } + + /** + * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie + */ + private void updateCache() { +// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, +// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build()); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } + } catch (RuntimeException e) { + logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + + /** + * 获取 account + * + * @param clientIp client_ip + * @return account + */ + public static String getAccount(String clientIp) { + + if (hBaseUtils == null) { + getInstance(); + } + return subIdMap.get(clientIp); + + } + + private static int getAcctStatusType(Result result) { + boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + if (hasType) { + return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + } else { + return 1; + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..1adb1d1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java @@ -0,0 +1,77 @@ +package com.zdjizhi.utils.http; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + * + * @author qidaijie + */ +public class HttpClientUtil { + private static final Log logger = LogFactory.get(); + + /** + * 请求网关获取schema + * + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + CloseableHttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(get); + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; + } + entityStringBuilder.append(c); + } + + return entityStringBuilder.toString(); + } + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); + } finally { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); + } + } + if (bufferedReader != null) { + IOUtils.closeQuietly(bufferedReader); + } + } + return ""; + } +} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..f477848 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -0,0 +1,372 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; +import java.util.concurrent.Executor; + + +/** + * 使用FastJson解析json的工具类 + * + * @author qidaijie + */ +public class JsonParseUtil { + private static final Log logger = LogFactory.get(); + private static Properties propNacos = new Properties(); + + /** + * 获取需要删除字段的列表 + */ + private static ArrayList dropList = new ArrayList<>(); + + /** + * 在内存中加载反射类用的map + */ + private static HashMap jsonFieldsMap; + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList; + + static { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + try { + ConfigService configService = NacosFactory.createConfigService(propNacos); + String dataId = FlowWriteConfig.NACOS_DATA_ID; + String group = FlowWriteConfig.NACOS_GROUP; + String schema = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(schema)) { + jsonFieldsMap = getMapFromHttp(schema); + jobList = getJobListFromHttp(schema); + } + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + if (StringUtil.isNotBlank(configMsg)) { + clearCache(); + jsonFieldsMap = getMapFromHttp(configMsg); + jobList = getJobListFromHttp(configMsg); + } + } + }); + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } + + /** + * 模式匹配,给定一个类型字符串返回一个类类型 + * + * @param type 类型 + * @return 类类型 + */ + + private static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "string": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "array": + clazz = List.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + /** + * 获取属性值的方法 + * + * @param obj 对象 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Object obj, String property) { + try { + BeanMap beanMap = BeanMap.create(obj); + return beanMap.get(property); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * 获取属性值的方法 + * + * @param jsonMap 原始日志 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Map jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * 更新属性值的方法 + * + * @param jsonMap 原始日志json map + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Map jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 更新属性值的方法 + * + * @param obj 对象 + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Object obj, String property, Object value) { + try { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } catch (ClassCastException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 类型转换 + * + * @param jsonMap 原始日志map + */ + public static Map typeTransform(Map jsonMap) throws RuntimeException { + JsonParseUtil.dropJsonField(jsonMap); + HashMap tmpMap = new HashMap<>(192); + for (String key : jsonMap.keySet()) { + if (jsonFieldsMap.containsKey(key)) { + String simpleName = jsonFieldsMap.get(key).getSimpleName(); + switch (simpleName) { + case "String": + tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); + break; + case "Integer": + tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key))); + break; + case "long": + tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key))); + break; + case "List": + tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key))); + break; + case "Map": + tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key))); + break; + case "double": + tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key))); + break; + default: + tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); + } + } + } + return tmpMap; + } + + public static ArrayList getJobList() { + return jobList; + } + + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + *

+ * // * @param http 网关schema地址 + * + * @return 用于反射生成schema类型的对象的一个map集合 + */ + private static HashMap getMapFromHttp(String schema) { + HashMap map = new HashMap<>(16); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(schema); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + String filedStr = field.toString(); + if (checkKeepField(filedStr)) { + String name = JsonPath.read(filedStr, "$.name").toString(); + String type = JsonPath.read(filedStr, "$.type").toString(); + if (type.contains("{")) { + type = JsonPath.read(filedStr, "$.type.type").toString(); + } + //组合用来生成实体类的map + map.put(name, getClassName(type)); + } else { + dropList.add(filedStr); + } + } + return map; + } + + /** + * 判断字段是否需要保留 + * + * @param message 单个field-json + * @return true or false + */ + private static boolean checkKeepField(String message) { + boolean isKeepField = true; + boolean isHiveDoc = JSON.parseObject(message).containsKey("doc"); + if (isHiveDoc) { + boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); + if (isHiveVi) { + String visibility = JsonPath.read(message, "$.doc.visibility").toString(); + if (FlowWriteConfig.VISIBILITY.equals(visibility)) { + isKeepField = false; + } + } + } + return isKeepField; + } + + /** + * 删除schema内指定的无效字段(jackson) + * + * @param jsonMap + */ + public static void dropJsonField(Map jsonMap) { + for (String field : dropList) { + jsonMap.remove(field); + } + } + + /** + * 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @param schema 日志schema + * @return 任务列表 + */ + private static ArrayList getJobListFromHttp(String schema) { + ArrayList list = new ArrayList<>(); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(schema); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + + if (JSON.parseObject(field.toString()).containsKey("doc")) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + + if (JSON.parseObject(doc.toString()).containsKey("format")) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + Object format = JSON.parseObject(doc.toString()).get("format"); + JSONObject formatObject = JSON.parseObject(format.toString()); + + String functions = formatObject.get("functions").toString(); + String appendTo = null; + String params = null; + + if (formatObject.containsKey("appendTo")) { + appendTo = formatObject.get("appendTo").toString(); + } + + if (formatObject.containsKey("param")) { + params = formatObject.get("param").toString(); + } + + + if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], null}); + } + + } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); + + } + } else { + list.add(new String[]{name, name, functions, params}); + } + + } + } + + } + return list; + } + + /** + * 在配置变动时,清空缓存重新获取 + */ + private static void clearCache() { + jobList.clear(); + jsonFieldsMap.clear(); + dropList.clear(); + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java new file mode 100644 index 0000000..79087a6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java @@ -0,0 +1,129 @@ +package com.zdjizhi.utils.json; + +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.FlowWriteException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +public class JsonTypeUtil { + /** + * 类型转换 + * + * @param jsonMap 原始日志map + */ + + /** + * String 类型检验转换方法 + * + * @param value json value + * @return String value + */ + static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new FlowWriteException("can not cast to map, value : " + value); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new FlowWriteException("can not cast to List, value : " + value); + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return Long value + */ + static long checkLongValue(Object value) { + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + /** + * Double 类型校验转换方法 + * + * @param value json value + * @return Double value + */ + static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return int value + */ + static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + if (intVal == null) { + return 0; + } + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java new file mode 100644 index 0000000..b13627f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -0,0 +1,171 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.FlowWriteException; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1218:20 + */ +public class TypeUtils { + private static final Log logger = LogFactory.get(); + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + public static Object castToIfFunction(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value.toString(); + } + + if (value instanceof Integer) { + return ((Number) value).intValue(); + } + + if (value instanceof Long) { + return ((Number) value).longValue(); + } + +// if (value instanceof Map) { +// return (Map) value; +// } +// +// if (value instanceof List) { +// return Collections.singletonList(value.toString()); +// } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + static Integer castToInt(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Integer) { + return (Integer) value; + } + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Integer Error,The error Str is:" + strVal); + } + } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Double类型判断方法 + * + * @param value json value + * @return double value or null + */ + static Double castToDouble(Object value) { + + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Double.parseDouble(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Double Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to double, value : " + value); + } + + /** + * Long类型判断方法 + * + * @param value json value + * @return (Long)value or null + */ + static Long castToLong(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Long.parseLong(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Long Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to long, value : " + value); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java new file mode 100644 index 0000000..ce059f8 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -0,0 +1,48 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param servers kafka 连接信息 + * @param properties kafka 连接配置信息 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); + } + + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java new file mode 100644 index 0000000..f935689 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -0,0 +1,74 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; + + +import java.util.Map; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class KafkaConsumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS); + properties.put("group.id", FlowWriteConfig.GROUP_ID); + properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); + properties.put("partition.discovery.interval.ms", "10000"); + CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties); + + return properties; + } + + /** + * 用户序列化kafka数据,增加 kafka Timestamp内容。 + * + * @return kafka logs -> map + */ + @SuppressWarnings("unchecked") + public static FlinkKafkaConsumer> myDeserializationConsumer() { + FlinkKafkaConsumer> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, + new TimestampDeserializationSchema(), createConsumerConfig()); + + //随着checkpoint提交,将offset提交到kafka + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + + //从消费组当前的offset开始消费 + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } + + /** + * 官方序列化kafka数据 + * + * @return kafka logs + */ + public static FlinkKafkaConsumer flinkConsumer() { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + //随着checkpoint提交,将offset提交到kafka + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + + //从消费组当前的offset开始消费 + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java new file mode 100644 index 0000000..a431c09 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -0,0 +1,82 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class KafkaProducer { + + + private static Properties createPercentProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS); + properties.put("acks", FlowWriteConfig.PRODUCER_ACK); + properties.put("retries", FlowWriteConfig.RETRIES); + properties.put("linger.ms", FlowWriteConfig.LINGER_MS); + properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", FlowWriteConfig.BATCH_SIZE); + properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY); + properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE); + properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS, properties); + + return properties; + } + + + + private static Properties createTrafficFileMetaProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.FILE_DATA_SINK_KAFKA_SERVERS); + properties.put("acks", FlowWriteConfig.PRODUCER_ACK); + properties.put("retries", FlowWriteConfig.RETRIES); + properties.put("linger.ms", FlowWriteConfig.LINGER_MS); + properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", FlowWriteConfig.BATCH_SIZE); + properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY); + properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE); + properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(FlowWriteConfig.FILE_DATA_SINK_KAFKA_SERVERS, properties); + + return properties; + } + + + + public static FlinkKafkaProducer getPercentKafkaProducer() { + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( + FlowWriteConfig.PERCENT_KAFKA_TOPIC, + new SimpleStringSchema(), + createPercentProducerConfig(), Optional.empty()); + + kafkaProducer.setLogFailuresOnly(false); + +// kafkaProducer.setWriteTimestampToKafka(true); + + return kafkaProducer; + } + public static FlinkKafkaProducer getTrafficFileMetaKafkaProducer() { + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( + FlowWriteConfig.FILE_DATA_SINK_KAFKA_TOPIC, + new SimpleStringSchema(), + createTrafficFileMetaProducerConfig(), Optional.empty()); + + kafkaProducer.setLogFailuresOnly(false); + +// kafkaProducer.setWriteTimestampToKafka(true); + + return kafkaProducer; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java new file mode 100644 index 0000000..920ffab --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java @@ -0,0 +1,48 @@ +package com.zdjizhi.utils.kafka; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2022/3/89:42 + */ +public class TimestampDeserializationSchema implements KafkaDeserializationSchema { + private static final Log logger = LogFactory.get(); + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Map.class); + } + + @Override + public boolean isEndOfStream(Object nextElement) { + return false; + } + + @Override + @SuppressWarnings("unchecked") + public Map deserialize(ConsumerRecord record) throws Exception { + if (record != null) { + try { + long timestamp = record.timestamp() / 1000; + String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING); + Map json = (Map) JsonMapper.fromJsonString(value, Map.class); + json.put("common_ingestion_time", timestamp); + return json; + } catch (RuntimeException e) { + logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage()); + } + } + return null; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java new file mode 100644 index 0000000..8d91027 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -0,0 +1,84 @@ +package com.zdjizhi.utils.system; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class FlowWriteConfigurations { + + private static Properties propKafka = new Properties(); + private static Properties propService = new Properties(); + private static Properties propfiletype = new Properties(); + private static Map fileTypeMap; + + public static boolean judgeFileType(String filetype){ + return fileTypeMap.containsKey(filetype); + } + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propKafka.getProperty(key); + } else { + return null; + } + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + propfiletype.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("file_type.properties")); + fileTypeMap = new HashMap((Map) propfiletype); + } catch (IOException | RuntimeException e) { + propKafka = null; + propService = null; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java new file mode 100644 index 0000000..2afab03 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java @@ -0,0 +1,190 @@ +package com.zdjizhi.utils.zookeeper; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * @author qidaijie + */ +public class DistributedLock implements Lock, Watcher { + private static final Log logger = LogFactory.get(); + + private ZooKeeper zk = null; + /** + * 根节点 + */ + private final String ROOT_LOCK = "/locks"; + /** + * 竞争的资源 + */ + private String lockName; + /** + * 等待的前一个锁 + */ + private String waitLock; + /** + * 当前锁 + */ + private String currentLock; + /** + * 计数器 + */ + private CountDownLatch countDownLatch; + + private int sessionTimeout = 2000; + + private List exceptionList = new ArrayList(); + + /** + * 配置分布式锁 + * + * @param config 连接的url + * @param lockName 竞争资源 + */ + public DistributedLock(String config, String lockName) { + this.lockName = lockName; + try { + // 连接zookeeper + zk = new ZooKeeper(config, sessionTimeout, this); + Stat stat = zk.exists(ROOT_LOCK, false); + if (stat == null) { + // 如果根节点不存在,则创建根节点 + zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException | InterruptedException | KeeperException e) { + logger.error("Node already exists!"); + } + } + + // 节点监视器 + @Override + public void process(WatchedEvent event) { + if (this.countDownLatch != null) { + this.countDownLatch.countDown(); + } + } + + @Override + public void lock() { + if (exceptionList.size() > 0) { + throw new LockException(exceptionList.get(0)); + } + try { + if (this.tryLock()) { + logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁"); + } else { + // 等待锁 + waitForLock(waitLock, sessionTimeout); + } + } catch (InterruptedException | KeeperException e) { + logger.error("获取锁异常" + e); + } + } + + @Override + public boolean tryLock() { + try { + String splitStr = "_lock_"; + if (lockName.contains(splitStr)) { + throw new LockException("锁名有误"); + } + // 创建临时有序节点 + currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + // 取所有子节点 + List subNodes = zk.getChildren(ROOT_LOCK, false); + // 取出所有lockName的锁 + List lockObjects = new ArrayList(); + for (String node : subNodes) { + String tmpNode = node.split(splitStr)[0]; + if (tmpNode.equals(lockName)) { + lockObjects.add(node); + } + } + Collections.sort(lockObjects); + // 若当前节点为最小节点,则获取锁成功 + if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { + return true; + } + // 若不是最小节点,则找到自己的前一个节点 + String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1); + waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); + } catch (InterruptedException | KeeperException e) { + logger.error("获取锁过程异常" + e); + } + return false; + } + + + @Override + public boolean tryLock(long timeout, TimeUnit unit) { + try { + if (this.tryLock()) { + return true; + } + return waitForLock(waitLock, timeout); + } catch (KeeperException | InterruptedException | RuntimeException e) { + logger.error("判断是否锁定异常" + e); + } + return false; + } + + // 等待锁 + private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { + Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); + + if (stat != null) { + this.countDownLatch = new CountDownLatch(1); + // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 + this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); + this.countDownLatch = null; + } + return true; + } + + @Override + public void unlock() { + try { + zk.delete(currentLock, -1); + currentLock = null; + zk.close(); + } catch (InterruptedException | KeeperException e) { + logger.error("关闭锁异常" + e); + } + } + + @Override + public Condition newCondition() { + return null; + } + + @Override + public void lockInterruptibly() throws InterruptedException { + this.lock(); + } + + + public class LockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockException(String e) { + super(e); + } + + public LockException(Exception e) { + super(e); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java new file mode 100644 index 0000000..9efbd46 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java @@ -0,0 +1,140 @@ +package com.zdjizhi.utils.zookeeper; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * @author qidaijie + * @Package cn.ac.iie.utils.zookeeper + * @Description: + * @date 2020/11/1411:28 + */ +public class ZookeeperUtils implements Watcher { + private static final Log logger = LogFactory.get(); + private static final int ID_MAX = 255; + + private ZooKeeper zookeeper; + + private static final int SESSION_TIME_OUT = 20000; + + private CountDownLatch countDownLatch = new CountDownLatch(1); + + @Override + public void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.SyncConnected) { + countDownLatch.countDown(); + } + } + + + /** + * 修改节点信息 + * + * @param path 节点路径 + */ + public int modifyNode(String path, String zookeeperIp) { + createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); + int workerId = 0; + try { + connectZookeeper(zookeeperIp); + Stat stat = zookeeper.exists(path, true); + workerId = Integer.parseInt(getNodeDate(path)); + if (workerId > ID_MAX) { + workerId = 0; + zookeeper.setData(path, "1".getBytes(), stat.getVersion()); + } else { + String result = String.valueOf(workerId + 1); + if (stat != null) { + zookeeper.setData(path, result.getBytes(), stat.getVersion()); + } else { + logger.error("Node does not exist!,Can't modify"); + } + } + } catch (KeeperException | InterruptedException e) { + logger.error("modify error Can't modify," + e); + } finally { + closeConn(); + } + logger.warn("workerID is:" + workerId); + return workerId; + } + + /** + * 连接zookeeper + * + * @param host 地址 + */ + public void connectZookeeper(String host) { + try { + zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); + countDownLatch.await(); + } catch (IOException | InterruptedException e) { + logger.error("Connection to the Zookeeper Exception! message:" + e); + } + } + + /** + * 关闭连接 + */ + public void closeConn() { + try { + if (zookeeper != null) { + zookeeper.close(); + } + } catch (InterruptedException e) { + logger.error("Close the Zookeeper connection Exception! message:" + e); + } + } + + /** + * 获取节点内容 + * + * @param path 节点路径 + * @return 内容/异常null + */ + public String getNodeDate(String path) { + String result = null; + Stat stat = new Stat(); + try { + byte[] resByte = zookeeper.getData(path, true, stat); + + result = StrUtil.str(resByte, "UTF-8"); + } catch (KeeperException | InterruptedException e) { + logger.error("Get node information exception" + e); + } + return result; + } + + /** + * @param path 节点创建的路径 + * @param date 节点所存储的数据的byte[] + * @param acls 控制权限策略 + */ + public void createNode(String path, byte[] date, List acls, String zookeeperIp) { + try { + connectZookeeper(zookeeperIp); + Stat exists = zookeeper.exists(path, true); + if (exists == null) { + Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); + if (existsSnowflakeld == null) { + zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); + } + zookeeper.create(path, date, acls, CreateMode.PERSISTENT); + } else { + logger.warn("Node already exists ! Don't need to create"); + } + } catch (KeeperException | InterruptedException e) { + logger.error(e); + } finally { + closeConn(); + } + } +} diff --git a/src/main/log4j.properties b/src/main/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console + + diff --git a/src/main/logback.xml b/src/main/logback.xml new file mode 100644 index 0000000..a508b6b --- /dev/null +++ b/src/main/logback.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + ${LOG_PATTERN} + + + + + + + ${LOG_FILE_PATH} + + 30 + + + 20MB + + + + + ${LOG_PATTERN} + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java new file mode 100644 index 0000000..170086c --- /dev/null +++ b/src/test/java/com/zdjizhi/EncryptorTest.java @@ -0,0 +1,35 @@ +package com.zdjizhi; + +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2022/3/1610:55 + */ +public class EncryptorTest { + + + @Test + public void passwordTest(){ + StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + // 配置加密解密的密码/salt值 + encryptor.setPassword("galaxy"); + // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p + String pin = "galaxy2019"; + String encPin = encryptor.encrypt(pin); + String user = "admin"; + String encUser = encryptor.encrypt(user); + System.out.println(encPin); + System.out.println(encUser); + // 再进行解密:raw_password + String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ"); + String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw=="); + + System.out.println("The username is: "+rawPwd); + System.out.println("The pin is: "+rawUser); + } + +} diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..2dd5837 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,52 @@ +package com.zdjizhi; + +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.IpLookupV2; +import com.zdjizhi.utils.general.CityHash; +import org.junit.Test; + +import java.math.BigInteger; +import java.util.Calendar; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/11/611:38 + */ +public class FunctionTest { + + private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) + .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") + .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") + .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") + .build(); + + @Test + public void CityHashTest() { + + byte[] dataBytes = String.valueOf(613970406986188816L).getBytes(); + long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); + String decimalValue = Long.toUnsignedString(hashValue, 10); + BigInteger result = new BigInteger(decimalValue); + System.out.println(result); + } + + @Test + public void ipLookupTest() { + String ip = "61.144.36.144"; + System.out.println(ipLookup.cityLookupDetail(ip)); + System.out.println(ipLookup.countryLookup(ip)); + } + + @Test + public void timestampTest(){ + Calendar cal = Calendar.getInstance(); + Long utcTime=cal.getTimeInMillis(); + System.out.println(utcTime); + System.out.println(System.currentTimeMillis()); + } +} diff --git a/src/test/java/com/zdjizhi/HBaseTest.java b/src/test/java/com/zdjizhi/HBaseTest.java new file mode 100644 index 0000000..5f94e32 --- /dev/null +++ b/src/test/java/com/zdjizhi/HBaseTest.java @@ -0,0 +1,54 @@ +package com.zdjizhi; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/12/310:42 + */ +public class HBaseTest { + + @Test + public void getColumn() { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181"); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + try { + Connection connection = ConnectionFactory.createConnection(configuration); + Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account")); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + int acctStatusType; + boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + if (hasType) { + acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + } else { + acctStatusType = 3; + } + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account); +// System.out.println(Arrays.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java new file mode 100644 index 0000000..cd7ada3 --- /dev/null +++ b/src/test/java/com/zdjizhi/json/JsonPathTest.java @@ -0,0 +1,79 @@ +package com.zdjizhi.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author qidaijie + * @Package com.zdjizhi.json + * @Description: + * @date 2022/3/2410:22 + */ +public class JsonPathTest { + private static final Log logger = LogFactory.get(); + + private static Properties propNacos = new Properties(); + + /** + * 获取需要删除字段的列表 + */ + private static ArrayList dropList = new ArrayList<>(); + + /** + * 在内存中加载反射类用的map + */ + private static HashMap map; + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList; + + private static String schema; + + static { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + try { + ConfigService configService = NacosFactory.createConfigService(propNacos); + String dataId = FlowWriteConfig.NACOS_DATA_ID; + String group = FlowWriteConfig.NACOS_GROUP; + String config = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(config)) { + schema = config; + } + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } + + @Test + public void parseSchemaGetFields() { + DocumentContext parse = JsonPath.parse(schema); + List fields = parse.read("$.fields[*]"); + for (Object field : fields) { + String name = JsonPath.read(field, "$.name").toString(); + String type = JsonPath.read(field, "$.type").toString(); + } + } +} diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java new file mode 100644 index 0000000..52b99e5 --- /dev/null +++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java @@ -0,0 +1,100 @@ +package com.zdjizhi.nacos; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import org.junit.Test; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; +import java.util.concurrent.Executor; + + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2022/3/1016:58 + */ +public class NacosTest { + + /** + * + * com.alibaba.nacos + * nacos-client + * 1.2.0 + * + */ + + private static Properties properties = new Properties(); + /** + * config data id = config name + */ + private static final String DATA_ID = "test"; + /** + * config group + */ + private static final String GROUP = "Galaxy"; + + private void getProperties() { + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + } + + + @Test + public void GetConfigurationTest() { + try { + getProperties(); + ConfigService configService = NacosFactory.createConfigService(properties); + String content = configService.getConfig(DATA_ID, GROUP, 5000); + Properties nacosConfigMap = new Properties(); + nacosConfigMap.load(new StringReader(content)); + System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); + } catch (NacosException | IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + @Test + public void ListenerConfigurationTest() { + getProperties(); + try { + //first get config + ConfigService configService = NacosFactory.createConfigService(properties); + String config = configService.getConfig(DATA_ID, GROUP, 5000); + System.out.println(config); + + //start listenner + configService.addListener(DATA_ID, GROUP, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + System.out.println(configMsg); + } + }); + } catch (NacosException e) { + e.printStackTrace(); + } + + //keep running,change nacos config,print new config + while (true) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java new file mode 100644 index 0000000..c81b809 --- /dev/null +++ b/src/test/java/com/zdjizhi/nacos/SchemaListener.java @@ -0,0 +1,136 @@ +package com.zdjizhi.nacos; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author qidaijie + * @Package com.zdjizhi.nacos + * @Description: + * @date 2022/3/1714:57 + */ +public class SchemaListener { + + private static Properties properties = new Properties(); + private static ArrayList jobList; + + + static { + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + + try { + ConfigService configService = NacosFactory.createConfigService(properties); + String dataId = "session_record.json"; + String group = "Galaxy"; + jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000)); + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + jobList = getJobListFromHttp(configMsg); + } + }); + } catch (NacosException e) { + e.printStackTrace(); + } + } + + + @Test + public void dealCommonMessage() { + //keep running,change nacos config,print new config + while (true) { + try { + System.out.println(Arrays.toString(jobList.get(0))); + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @return 任务列表 + */ + private static ArrayList getJobListFromHttp(String schema) { + ArrayList list = new ArrayList<>(); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(schema); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + + if (JSON.parseObject(field.toString()).containsKey("doc")) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + + if (JSON.parseObject(doc.toString()).containsKey("format")) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + Object format = JSON.parseObject(doc.toString()).get("format"); + JSONObject formatObject = JSON.parseObject(format.toString()); + + String functions = formatObject.get("functions").toString(); + String appendTo = null; + String params = null; + + if (formatObject.containsKey("appendTo")) { + appendTo = formatObject.get("appendTo").toString(); + } + + if (formatObject.containsKey("param")) { + params = formatObject.get("param").toString(); + } + + + if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], null}); + } + + } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); + + } + } else { + list.add(new String[]{name, name, functions, params}); + } + + } + } + + } + return list; + } + +}