diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c7ddb06
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,301 @@
+
+
+
+ 4.0.0
+
+ com.zdjizhi
+ log-completion-schema
+ 20210728
+
+ log-completion-schema
+ http://www.example.com
+
+
+
+ nexus
+ Team Nexus Repository
+ http://192.168.40.125:8099/content/groups/public
+
+
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+
+
+
+
+ fail
+
+
+
+
+
+ UTF-8
+ 1.13.1
+ 2.7.1
+ 1.0.0
+ 2.2.3
+ provided
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.2
+
+
+ package
+
+ shade
+
+
+
+
+ com.zdjizhi.topology.LogFlowWriteTopology
+
+
+
+
+
+
+
+
+ io.github.zlika
+ reproducible-build-maven-plugin
+ 0.2
+
+
+
+ strip-jar
+
+ package
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 2.3.2
+
+ 1.8
+ 1.8
+
+
+
+
+
+ properties
+
+ **/*.properties
+ **/*.xml
+
+ false
+
+
+
+ src\main\java
+
+ log4j.properties
+
+ false
+
+
+
+
+
+
+
+ com.zdjizhi
+ galaxy
+ 1.0.6
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.70
+
+
+
+
+ org.apache.flink
+ flink-table
+ ${flink.version}
+ pom
+ ${scope.type}
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_2.11
+ ${flink.version}
+
+
+
+
+
+ org.apache.flink
+ flink-clients_2.11
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka_2.11
+ ${flink.version}
+
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.10
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+
+ cglib
+ cglib-nodep
+ 3.2.4
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.3.2
+ compile
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.2
+
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
+
+ io.prometheus
+ simpleclient_pushgateway
+ 0.9.0
+
+
+
+ cn.hutool
+ hutool-all
+ 5.5.2
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
new file mode 100644
index 0000000..d82130d
--- /dev/null
+++ b/properties/default_config.properties
@@ -0,0 +1,29 @@
+#producer重试的次数设置
+retries=0
+
+#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
+linger.ms=10
+
+#如果在超时之前未收到响应,客户端将在必要时重新发送请求
+request.timeout.ms=30000
+
+#producer都是按照batch进行发送的,批次大小,默认:16384
+batch.size=262144
+
+#Producer端用于缓存消息的缓冲区大小
+#64M
+#buffer.memory=67108864
+#128M
+buffer.memory=134217728
+
+#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
+#5M
+#max.request.size=5242880
+#10M
+max.request.size=10485760
+
+#hbase table name
+hbase.table.name=subscriber_info
+
+#邮件默认编码
+mail.default.charset=UTF-8
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
new file mode 100644
index 0000000..9bb2f84
--- /dev/null
+++ b/properties/service_flow_config.properties
@@ -0,0 +1,74 @@
+#--------------------------------鍦板潃閰嶇疆------------------------------#
+
+#绠$悊kafka鍦板潃
+input.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
+
+#绠$悊杈撳嚭kafka鍦板潃
+output.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
+
+#zookeeper 鍦板潃 鐢ㄤ簬閰嶇疆log_id
+zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+
+#hbase zookeeper鍦板潃 鐢ㄤ簬杩炴帴HBase
+hbase.zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+
+#--------------------------------HTTP/瀹氫綅搴------------------------------#
+#瀹氫綅搴撳湴鍧
+ip.library=/home/bigdata/topology/dat/
+
+#缃戝叧鐨剆chema浣嶇疆
+schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log
+
+#缃戝叧APP_ID 鑾峰彇鎺ュ彛
+app.id.http=http://192.168.44.67:9999/open-api/appDicList
+
+#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------#
+
+#kafka 鎺ユ敹鏁版嵁topic
+input.kafka.topic=CONNECTION-RECORD-LOG
+
+#琛ュ叏鏁版嵁 杈撳嚭 topic
+output.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
+
+#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛
+group.id=connection-record-flink-20210809
+
+#鐢熶骇鑰呭帇缂╂ā寮 none or snappy
+producer.kafka.compression.type=none
+
+#鐢熶骇鑰卆ck
+producer.ack=1
+
+#鎺ユ敹鑷猭afka鐨勬秷璐硅 client-id
+consumer.client.id=consumer-connection-record
+
+#鍥炲啓缁檏afka鐨勭敓浜ц client-id
+producer.client.id=producer-connection-record
+
+#--------------------------------topology閰嶇疆------------------------------#
+
+#consumer 骞惰搴
+consumer.parallelism=3
+
+#map鍑芥暟骞惰搴
+map.parallelism=3
+
+#producer 骞惰搴
+producer.parallelism=3
+
+#鏁版嵁涓績锛屽彇鍊艰寖鍥(0-63)
+data.center.id.num=0
+
+#hbase 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨
+hbase.tick.tuple.freq.secs=180
+
+#app_id 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨
+app.tick.tuple.freq.secs=0
+
+#--------------------------------榛樿鍊奸厤缃------------------------------#
+
+#閭欢榛樿缂栫爜
+mail.default.charset=UTF-8
+
+#0涓嶉渶瑕佽ˉ鍏ㄥ師鏍疯緭鍑烘棩蹇楋紝1闇瑕佽ˉ鍏
+log.need.complete=1
diff --git a/src/main/java/com/zdjizhi/common/DefaultProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java
new file mode 100644
index 0000000..b98ea53
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DefaultProConfig.java
@@ -0,0 +1,21 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.FlowWriteConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class DefaultProConfig {
+
+
+ public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
+ public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
new file mode 100644
index 0000000..bf82757
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -0,0 +1,59 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.FlowWriteConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class FlowWriteConfig {
+
+ public static final int IF_PARAM_LENGTH = 3;
+ public static final String VISIBILITY = "disabled";
+ public static final String FORMAT_SPLITTER = ",";
+ public static final String IS_JSON_KEY_TAG = "$.";
+ public static final String IF_CONDITION_SPLITTER = "=";
+ public static final String MODEL = "remote";
+ public static final String PROTOCOL_SPLITTER = "\\.";
+ /**
+ * System
+ */
+ public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism");
+ public static final Integer MAP_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "map.parallelism");
+ public static final Integer PRODUCER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "producer.parallelism");
+ public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
+ public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
+ public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
+ public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
+
+
+
+ /**
+ * kafka
+ */
+ public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers");
+ public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers");
+ public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
+ public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
+ public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic");
+ public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic");
+ public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
+ public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
+
+ /**
+ * kafka闄愭祦閰嶇疆-20201117
+ */
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id");
+ public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id");
+
+ /**
+ * http
+ */
+ public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
+ public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http");
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
new file mode 100644
index 0000000..a9b38ca
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -0,0 +1,56 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.functions.FilterNullFunction;
+import com.zdjizhi.utils.functions.MapCompletedFunction;
+import com.zdjizhi.utils.kafka.Consumer;
+import com.zdjizhi.utils.kafka.Producer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class LogFlowWriteTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ //寮鍚疌heckpoint锛宨nterval鐢ㄤ簬鎸囧畾checkpoint鐨勮Е鍙戦棿闅(鍗曚綅milliseconds)
+// environment.enableCheckpointing(5000);
+
+ DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer())
+ .setParallelism(FlowWriteConfig.CONSUMER_PARALLELISM);
+
+ if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
+ //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑
+ DataStream cleaningLog = streamSource.map(new MapCompletedFunction())
+ .name("TransFormLogs").setParallelism(FlowWriteConfig.MAP_PARALLELISM);
+ //杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐
+ DataStream result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData");
+ //鍙戦佹暟鎹埌Kafka
+ result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
+ .setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
+ } else {
+ DataStream result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
+ result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
+ }
+
+ try {
+ environment.execute(args[0]);
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
new file mode 100644
index 0000000..0caeb25
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
@@ -0,0 +1,123 @@
+package com.zdjizhi.utils.app;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.http.HttpClientUtil;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AppId 宸ュ叿绫
+ *
+ * @author qidaijie
+ */
+
+public class AppUtils {
+ private static final Log logger = LogFactory.get();
+ private static Map appIdMap = new ConcurrentHashMap<>(128);
+ private static AppUtils appUtils;
+
+ private static void getAppInstance() {
+ appUtils = new AppUtils();
+ }
+
+
+ /**
+ * 鏋勯犲嚱鏁-鏂
+ */
+ private AppUtils() {
+ //瀹氭椂鏇存柊
+ updateAppIdCache();
+ }
+
+ /**
+ * 鏇存柊鍙橀噺
+ */
+ private static void change() {
+ if (appUtils == null) {
+ getAppInstance();
+ }
+ timestampsFilter();
+ }
+
+
+ /**
+ * 鑾峰彇鍙樻洿鍐呭
+ */
+ private static void timestampsFilter() {
+ try {
+ Long begin = System.currentTimeMillis();
+ String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
+ if (StringUtil.isNotBlank(schema)) {
+ String data = JSONObject.parseObject(schema).getString("data");
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ int key = jsonArray.getInteger(0);
+ String value = jsonArray.getString(1);
+ if (appIdMap.containsKey(key)) {
+ if (!value.equals(appIdMap.get(key))) {
+ appIdMap.put(key, value);
+ }
+ } else {
+ appIdMap.put(key, value);
+ }
+ }
+ logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
+ logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
+ }
+ } catch (RuntimeException e) {
+ logger.error("Update cache app-id failed, exception锛" + e);
+ }
+ }
+
+
+ /**
+ * 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie
+ */
+ private void updateAppIdCache() {
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
+ change();
+ }
+ } catch (RuntimeException e) {
+ logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
+ }
+ }
+ }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * 鑾峰彇 appName
+ *
+ * @param appId app_id
+ * @return account
+ */
+ public static String getAppName(int appId) {
+
+ if (appUtils == null) {
+ getAppInstance();
+ }
+
+ if (appIdMap.containsKey(appId)) {
+ return appIdMap.get(appId);
+ } else {
+ logger.warn("AppMap get appName is null, ID is :" + appId);
+ return "";
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java
new file mode 100644
index 0000000..67c88f0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.utils.exception;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.execption
+ * @Description:
+ * @date 2021/3/259:42
+ */
+public class FlowWriteException extends RuntimeException {
+
+ public FlowWriteException() {
+ }
+
+ public FlowWriteException(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..de507ad
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class FilterNullFunction implements FilterFunction {
+ @Override
+ public boolean filter(String message) {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
new file mode 100644
index 0000000..5618159
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -0,0 +1,28 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.general.TransFormTypeMap;
+import org.apache.flink.api.common.functions.MapFunction;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class MapCompletedFunction implements MapFunction {
+ private static final Log logger = LogFactory.get();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(String logs) {
+ try {
+ return TransFormTypeMap.dealCommonMessage(logs);
+ } catch (RuntimeException e) {
+ logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + logs);
+ return "";
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
new file mode 100644
index 0000000..d203a2b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
@@ -0,0 +1,213 @@
+package com.zdjizhi.utils.general;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.zookeeper.DistributedLock;
+import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
+
+/**
+ * 闆姳绠楁硶
+ *
+ * @author qidaijie
+ */
+public class SnowflakeId {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 鍏64浣 绗竴浣嶄负绗﹀彿浣 榛樿0
+ * 鏃堕棿鎴 39浣(17 year), centerId:(鍏宠仈姣忎釜鐜鎴栦换鍔℃暟) :6浣(0-63),
+ * workerId(鍏宠仈杩涚▼):7(0-127) ,搴忓垪鍙凤細11浣(2047/ms)
+ *
+ * 搴忓垪鍙 /ms = (-1L ^ (-1L << 11))
+ * 鏈澶т娇鐢ㄥ勾 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
+ */
+ /**
+ * 寮濮嬫椂闂存埅 (2020-11-14 00:00:00) max 17years
+ */
+ private final long twepoch = 1605283200000L;
+
+ /**
+ * 鏈哄櫒id鎵鍗犵殑浣嶆暟
+ */
+ private final long workerIdBits = 7L;
+
+ /**
+ * 鏁版嵁鏍囪瘑id鎵鍗犵殑浣嶆暟
+ */
+ private final long dataCenterIdBits = 6L;
+
+ /**
+ * 鏀寔鐨勬渶澶ф満鍣╥d锛岀粨鏋滄槸63 (杩欎釜绉讳綅绠楁硶鍙互寰堝揩鐨勮绠楀嚭鍑犱綅浜岃繘鍒舵暟鎵鑳借〃绀虹殑鏈澶у崄杩涘埗鏁)
+ * M << n = M * 2^n
+ */
+ private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
+
+ /**
+ * 鏀寔鐨勬渶澶ф暟鎹爣璇唅d锛岀粨鏋滄槸127
+ */
+ private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
+
+ /**
+ * 搴忓垪鍦╥d涓崰鐨勪綅鏁
+ */
+ private final long sequenceBits = 11L;
+
+ /**
+ * 鏈哄櫒ID鍚戝乏绉12浣
+ */
+ private final long workerIdShift = sequenceBits;
+
+ /**
+ * 鏁版嵁鏍囪瘑id鍚戝乏绉17浣(14+6)
+ */
+ private final long dataCenterIdShift = sequenceBits + workerIdBits;
+
+ /**
+ * 鏃堕棿鎴悜宸︾Щ22浣(4+6+14)
+ */
+ private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
+
+ /**
+ * 鐢熸垚搴忓垪鐨勬帺鐮侊紝杩欓噷涓2047
+ */
+ private final long sequenceMask = -1L ^ (-1L << sequenceBits);
+
+ /**
+ * 宸ヤ綔鏈哄櫒ID(0~127)
+ */
+ private long workerId;
+
+ /**
+ * 鏁版嵁涓績ID(0~63)
+ */
+ private long dataCenterId;
+
+ /**
+ * 姣鍐呭簭鍒(0~2047)
+ */
+ private long sequence = 0L;
+
+ /**
+ * 涓婃鐢熸垚ID鐨勬椂闂存埅
+ */
+ private long lastTimestamp = -1L;
+
+
+ /**
+ * 璁剧疆鍏佽鏃堕棿鍥炴嫧鐨勬渶澶ч檺鍒10s
+ */
+ private static final long rollBackTime = 10000L;
+
+
+ private static SnowflakeId idWorker;
+
+ private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
+
+ static {
+ idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM);
+ }
+
+ //==============================Constructors=====================================
+
+ /**
+ * 鏋勯犲嚱鏁
+ */
+ private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
+ DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
+ try {
+ lock.lock();
+ int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
+ if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
+ throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
+ }
+ if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
+ throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
+ }
+ this.workerId = tmpWorkerId;
+ this.dataCenterId = dataCenterIdNum;
+ } catch (RuntimeException e) {
+ logger.error("This is not usual error!!!===>>>" + e + "<<<===");
+ }finally {
+ lock.unlock();
+ }
+ }
+
+ // ==============================Methods==========================================
+
+ /**
+ * 鑾峰緱涓嬩竴涓狪D (璇ユ柟娉曟槸绾跨▼瀹夊叏鐨)
+ *
+ * @return SnowflakeId
+ */
+ private synchronized long nextId() {
+ long timestamp = timeGen();
+ //璁剧疆涓涓厑璁稿洖鎷ㄩ檺鍒舵椂闂达紝绯荤粺鏃堕棿鍥炴嫧鑼冨洿鍦╮ollBackTime鍐呭彲浠ョ瓑寰呮牎鍑
+ if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ //濡傛灉褰撳墠鏃堕棿灏忎簬涓婁竴娆D鐢熸垚鐨勬椂闂存埑锛岃鏄庣郴缁熸椂閽熷洖閫杩囪繖涓椂鍊欏簲褰撴姏鍑哄紓甯
+ if (timestamp < lastTimestamp) {
+ throw new RuntimeException(
+ String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
+ }
+
+ //濡傛灉鏄悓涓鏃堕棿鐢熸垚鐨勶紝鍒欒繘琛屾绉掑唴搴忓垪
+ if (lastTimestamp == timestamp) {
+ sequence = (sequence + 1) & sequenceMask;
+ //姣鍐呭簭鍒楁孩鍑
+ if (sequence == 0) {
+ //闃诲鍒颁笅涓涓绉,鑾峰緱鏂扮殑鏃堕棿鎴
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ }
+ //鏃堕棿鎴虫敼鍙橈紝姣鍐呭簭鍒楅噸缃
+ else {
+ sequence = 0L;
+ }
+
+ //涓婃鐢熸垚ID鐨勬椂闂存埅
+ lastTimestamp = timestamp;
+
+ //绉讳綅骞堕氳繃鎴栬繍绠楁嫾鍒颁竴璧风粍鎴64浣嶇殑ID
+ return ((timestamp - twepoch) << timestampLeftShift)
+ | (dataCenterId << dataCenterIdShift)
+ | (workerId << workerIdShift)
+ | sequence;
+ }
+
+ /**
+ * 闃诲鍒颁笅涓涓绉掞紝鐩村埌鑾峰緱鏂扮殑鏃堕棿鎴
+ *
+ * @param lastTimestamp 涓婃鐢熸垚ID鐨勬椂闂存埅
+ * @return 褰撳墠鏃堕棿鎴
+ */
+ protected long tilNextMillis(long lastTimestamp) {
+ long timestamp = timeGen();
+ while (timestamp <= lastTimestamp) {
+ timestamp = timeGen();
+ }
+ return timestamp;
+ }
+
+ /**
+ * 杩斿洖浠ユ绉掍负鍗曚綅鐨勫綋鍓嶆椂闂
+ *
+ * @return 褰撳墠鏃堕棿(姣)
+ */
+ protected long timeGen() {
+ return System.currentTimeMillis();
+ }
+
+
+ /**
+ * 闈欐佸伐鍏风被
+ *
+ * @return
+ */
+ public static Long generateId() {
+ return idWorker.nextId();
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
new file mode 100644
index 0000000..f67b842
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
@@ -0,0 +1,144 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+
+/**
+ * 鎻忚堪:杞崲鎴栬ˉ鍏ㄥ伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class TransFormMap {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 鑾峰彇浠诲姟鍒楄〃
+ * list鐨勬瘡涓厓绱犳槸涓涓洓鍏冨瓧绗︿覆鏁扮粍 (鏈塮ormat鏍囪瘑鐨勫瓧娈碉紝琛ュ叏鐨勫瓧娈碉紝鐢ㄥ埌鐨勫姛鑳藉嚱鏁帮紝鐢ㄥ埌鐨勫弬鏁)锛屼緥濡傦細
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 瑙f瀽鏃ュ織锛屽苟琛ュ叏
+ *
+ * @param message kafka Topic鍘熷鏃ュ織
+ * @return 琛ュ叏鍚庣殑鏃ュ織
+ */
+ @SuppressWarnings("unchecked")
+ public static String dealCommonMessage(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class);
+ for (String[] strings : jobList) {
+ //鐢ㄥ埌鐨勫弬鏁扮殑鍊
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey
+ String appendToKeyName = strings[1];
+ //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫
+ Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈
+ String function = strings[2];
+ //棰濆鐨勫弬鏁扮殑鍊
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
+ }
+ return JsonMapper.toJsonString(jsonMap);
+ } else {
+ return "";
+ }
+ } catch (RuntimeException e) {
+ logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message);
+ return "";
+ }
+ }
+
+
+ /**
+ * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎
+ *
+ * @param function 鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈
+ * @param jsonMap 鍘熷鏃ュ織瑙f瀽map
+ * @param appendToKeyName 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey
+ * @param appendTo 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫
+ * @param logValue 鐢ㄥ埌鐨勫弬鏁扮殑鍊
+ * @param param 棰濆鐨勫弬鏁扮殑鍊
+ */
+ private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendTo instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendTo == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ default:
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java
new file mode 100644
index 0000000..26795b0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java
@@ -0,0 +1,153 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+
+/**
+ * 鎻忚堪:杞崲鎴栬ˉ鍏ㄥ伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class TransFormObject {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 鍦ㄥ唴瀛樹腑鍔犺浇鍙嶅皠绫荤敤鐨刴ap
+ */
+ private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 鍙嶅皠鎴愪竴涓被
+ */
+ private static Object mapObject = JsonParseUtil.generateObject(map);
+
+ /**
+ * 鑾峰彇浠诲姟鍒楄〃
+ * list鐨勬瘡涓厓绱犳槸涓涓洓鍏冨瓧绗︿覆鏁扮粍 (鏈塮ormat鏍囪瘑鐨勫瓧娈碉紝琛ュ叏鐨勫瓧娈碉紝鐢ㄥ埌鐨勫姛鑳藉嚱鏁帮紝鐢ㄥ埌鐨勫弬鏁)锛屼緥濡傦細
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 瑙f瀽鏃ュ織锛屽苟琛ュ叏
+ *
+ * @param message kafka Topic鍘熷鏃ュ織
+ * @return 琛ュ叏鍚庣殑鏃ュ織
+ */
+ public static String dealCommonMessage(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Object object = JsonMapper.fromJsonString(message, mapObject.getClass());
+ for (String[] strings : jobList) {
+ //鐢ㄥ埌鐨勫弬鏁扮殑鍊
+ Object name = JsonParseUtil.getValue(object, strings[0]);
+ //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey
+ String appendToKeyName = strings[1];
+ //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫
+ Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
+ //鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈
+ String function = strings[2];
+ //棰濆鐨勫弬鏁扮殑鍊
+ String param = strings[3];
+ functionSet(function, object, appendToKeyName, appendTo, name, param);
+ }
+ return JsonMapper.toJsonString(object);
+ } else {
+ return "";
+ }
+ } catch (RuntimeException e) {
+ logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message);
+ return "";
+ }
+ }
+
+
+ /**
+ * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎
+ *
+ * @param function 鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈
+ * @param object 鍔ㄦ丳OJO Object
+ * @param appendToKeyName 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey
+ * @param appendTo 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫
+ * @param name 鐢ㄥ埌鐨勫弬鏁扮殑鍊
+ * @param param 棰濆鐨勫弬鏁扮殑鍊
+ */
+ private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendTo instanceof Long)) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString()));
+ }
+ break;
+ case "set_value":
+ if (name != null && param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param));
+ }
+ break;
+ case "get_value":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, name);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendTo == null && name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString()));
+ }
+ break;
+ case "radius_match":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString()));
+ }
+ break;
+ case "app_match":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (name != null && param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
+ }
+ break;
+ default:
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
new file mode 100644
index 0000000..7779da2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
@@ -0,0 +1,145 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.JsonTypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+
+/**
+ * 鎻忚堪:杞崲鎴栬ˉ鍏ㄥ伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class TransFormTypeMap {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 鑾峰彇浠诲姟鍒楄〃
+ * list鐨勬瘡涓厓绱犳槸涓涓洓鍏冨瓧绗︿覆鏁扮粍 (鏈塮ormat鏍囪瘑鐨勫瓧娈碉紝琛ュ叏鐨勫瓧娈碉紝鐢ㄥ埌鐨勫姛鑳藉嚱鏁帮紝鐢ㄥ埌鐨勫弬鏁)锛屼緥濡傦細
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 瑙f瀽鏃ュ織锛屽苟琛ュ叏
+ *
+ * @param message kafka Topic鍘熷鏃ュ織
+ * @return 琛ュ叏鍚庣殑鏃ュ織
+ */
+ @SuppressWarnings("unchecked")
+ public static String dealCommonMessage(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class);
+ for (String[] strings : jobList) {
+ //鐢ㄥ埌鐨勫弬鏁扮殑鍊
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey
+ String appendToKeyName = strings[1];
+ //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫
+ Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈
+ String function = strings[2];
+ //棰濆鐨勫弬鏁扮殑鍊
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
+ }
+ return JsonMapper.toJsonString(JsonTypeUtils.typeTransform(jsonMap));
+ } else {
+ return "";
+ }
+ } catch (RuntimeException e) {
+ logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message);
+ return "";
+ }
+ }
+
+
+ /**
+ * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎
+ *
+ * @param function 鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈
+ * @param jsonMap 鍘熷鏃ュ織瑙f瀽map
+ * @param appendToKeyName 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey
+ * @param appendToKeyValue 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫
+ * @param logValue 鐢ㄥ埌鐨勫弬鏁扮殑鍊
+ * @param param 棰濆鐨勫弬鏁扮殑鍊
+ */
+ private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendToKeyValue instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendToKeyValue == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ default:
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
new file mode 100644
index 0000000..9fada7b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -0,0 +1,317 @@
+package com.zdjizhi.utils.general;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.text.StrSpliter;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.DefaultProConfig;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.FormatUtils;
+import com.zdjizhi.utils.IpLookup;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.app.AppUtils;
+import com.zdjizhi.utils.hbase.HBaseUtils;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.TypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @author qidaijie
+ */
+class TransFunction {
+
+ private static final Log logger = LogFactory.get();
+
+ private static final Pattern PATTERN = Pattern.compile("[0-9]*");
+
+ /**
+ * IP瀹氫綅搴撳伐鍏风被
+ */
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb")
+ .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
+ .build();
+
+ /**
+ * 鐢熸垚褰撳墠鏃堕棿鎴崇殑鎿嶄綔
+ */
+ static long getCurrentTime() {
+
+ return System.currentTimeMillis() / 1000;
+ }
+
+ /**
+ * 鏍规嵁clientIp鑾峰彇location淇℃伅
+ *
+ * @param ip client IP
+ * @return ip鍦板潃璇︾粏淇℃伅
+ */
+ static String getGeoIpDetail(String ip) {
+
+ return ipLookup.cityLookupDetail(ip);
+
+ }
+
+ /**
+ * 鏍规嵁ip鑾峰彇asn淇℃伅
+ *
+ * @param ip client/server IP
+ * @return ASN
+ */
+ static String getGeoAsn(String ip) {
+
+ return ipLookup.asnLookup(ip);
+ }
+
+ /**
+ * 鏍规嵁ip鑾峰彇country淇℃伅
+ *
+ * @param ip server IP
+ * @return 鍥藉
+ */
+ static String getGeoIpCountry(String ip) {
+
+ return ipLookup.countryLookup(ip);
+ }
+
+
+ /**
+ * radius鍊熷姪HBase琛ラ綈
+ *
+ * @param ip client IP
+ * @return account
+ */
+ static String radiusMatch(String ip) {
+ String account = HBaseUtils.getAccount(ip.trim());
+ if (StringUtil.isBlank(account)) {
+ logger.warn("HashMap get account is null, Ip is :" + ip);
+ }
+ return account;
+ }
+
+ /**
+ * appId涓庣紦瀛樹腑瀵瑰簲鍏崇郴琛ュ叏appName
+ *
+ * @param appIds app id 鍒楄〃
+ * @return appName
+ */
+ static String appMatch(String appIds) {
+ try {
+ String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
+ return AppUtils.getAppName(Integer.parseInt(appId));
+ } catch (NumberFormatException | ClassCastException exception) {
+ logger.error("APP ID鍒楄〃鍒嗗壊杞崲寮傚父锛屽紓甯窤PP ID鍒楄〃:" + appIds);
+ return "";
+ }
+ }
+
+ /**
+ * 瑙f瀽椤剁骇鍩熷悕
+ *
+ * @param domain 鍒濆鍩熷悕
+ * @return 椤剁骇鍩熷悕
+ */
+ static String getTopDomain(String domain) {
+ try {
+ return FormatUtils.getTopPrivateDomain(domain);
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("瑙f瀽椤剁骇鍩熷悕寮傚父,寮傚父鍩熷悕:" + domain);
+ return "";
+ }
+ }
+
+ /**
+ * 鏍规嵁缂栫爜瑙g爜base64
+ *
+ * @param message base64
+ * @param charset 缂栫爜
+ * @return 瑙g爜瀛楃涓
+ */
+ static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset == null) {
+ result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ } else {
+ result = Base64.decodeStr(message, charset.toString());
+ }
+ }
+ } catch (RuntimeException rune) {
+ logger.error("瑙f瀽 Base64 寮傚父,寮傚父淇℃伅:" + rune);
+ }
+ return result;
+ }
+
+ /**
+ * 鏍规嵁琛ㄨ揪寮忚В鏋恓son
+ *
+ * @param message json
+ * @param expr 瑙f瀽琛ㄨ揪寮
+ * @return 瑙f瀽缁撴灉
+ */
+ static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ if (StringUtil.isNotBlank(expr)) {
+ ArrayList read = JsonPath.parse(message).read(expr);
+ flattenResult = read.get(0);
+ }
+ } catch (ClassCastException | InvalidPathException e) {
+ logger.error("璁惧鏍囩瑙f瀽寮傚父锛孾 " + expr + " ]瑙f瀽琛ㄨ揪寮忛敊璇" + e);
+ }
+ return flattenResult;
+ }
+
+
+ /**
+ * 鍒ゆ柇鏄惁涓烘棩蹇楀瓧娈,鏄垯杩斿洖瀵瑰簲value锛屽惁鍒欒繑鍥炲師濮嬪瓧绗︿覆
+ *
+ * @param object 鍐呭瓨瀹炰綋绫
+ * @param param 瀛楁鍚/鏅氬瓧绗︿覆
+ * @return JSON.Value or String
+ */
+ static Object isJsonValue(Object object, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ return JsonParseUtil.getValue(object, param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * 鍒ゆ柇鏄惁涓烘棩蹇楀瓧娈,鏄垯杩斿洖瀵瑰簲value锛屽惁鍒欒繑鍥炲師濮嬪瓧绗︿覆
+ *
+ * @param jsonMap 鍐呭瓨瀹炰綋绫
+ * @param param 瀛楁鍚/鏅氬瓧绗︿覆
+ * @return JSON.Value or String
+ */
+ static Object isJsonValue(Map jsonMap, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ return JsonParseUtil.getValue(jsonMap, param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF鍑芥暟瀹炵幇锛岃В鏋愭棩蹇楁瀯寤轰笁鐩繍绠;鍖呭惈鍒ゆ柇鏄惁涓烘暟瀛楄嫢涓烘暟瀛楀垯杞崲涓簂ong绫诲瀷杩斿洖缁撴灉銆
+ *
+ * @param object 鍐呭瓨瀹炰綋绫
+ * @param ifParam 瀛楁鍚/鏅氬瓧绗︿覆
+ * @return resultA or resultB or null
+ */
+ static Object condition(Object object, String ifParam) {
+ Object result = null;
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(object, norms[0]);
+ Object resultA = isJsonValue(object, split[1]);
+ Object resultB = isJsonValue(object, split[2]);
+ if (direction instanceof Number) {
+// result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
+ } else if (direction instanceof String) {
+ result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
+// result = direction.equals(norms[1]) ? resultA : resultB;
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF 鍑芥暟鎵ц寮傚父,寮傚父淇℃伅:" + e);
+ }
+ return result;
+ }
+
+ /**
+ * IF鍑芥暟瀹炵幇锛岃В鏋愭棩蹇楁瀯寤轰笁鐩繍绠;鍖呭惈鍒ゆ柇鏄惁涓烘暟瀛楄嫢涓烘暟瀛楀垯杞崲涓簂ong绫诲瀷杩斿洖缁撴灉銆
+ *
+ * @param jsonMap 鍐呭瓨瀹炰綋绫
+ * @param ifParam 瀛楁鍚/鏅氬瓧绗︿覆
+ * @return resultA or resultB or null
+ */
+ static Object condition(Map jsonMap, String ifParam) {
+ Object result = null;
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(jsonMap, norms[0]);
+ Object resultA = isJsonValue(jsonMap, split[1]);
+ Object resultB = isJsonValue(jsonMap, split[2]);
+ if (direction instanceof Number) {
+ result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
+// result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
+ } else if (direction instanceof String) {
+// result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
+ result = direction.equals(norms[1]) ? resultA : resultB;
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF 鍑芥暟鎵ц寮傚父,寮傚父淇℃伅:" + e);
+ }
+ return result;
+ }
+
+// /**
+// * IF鍑芥暟瀹炵幇锛岃В鏋愭棩蹇楁瀯寤轰笁鐩繍绠;鍖呭惈鍒ゆ柇鏄惁涓烘暟瀛楄嫢涓烘暟瀛楀垯杞崲涓簂ong绫诲瀷杩斿洖缁撴灉銆
+// *
+// * @param jsonMap 鍘熷鏃ュ織
+// * @param ifParam 瀛楁鍚/鏅氬瓧绗︿覆
+// * @return resultA or resultB or null
+// */
+// static Object condition(Map jsonMap, String ifParam) {
+// try {
+// String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+// String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+// String direction = isJsonValue(jsonMap, norms[0]);
+// if (StringUtil.isNotBlank(direction)) {
+// if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+// String resultA = isJsonValue(jsonMap, split[1]);
+// String resultB = isJsonValue(jsonMap, split[2]);
+// String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
+// Matcher isNum = PATTERN.matcher(result);
+// if (isNum.matches()) {
+// return Long.parseLong(result);
+// } else {
+// return result;
+// }
+// }
+// }
+// } catch (RuntimeException e) {
+// logger.error("IF 鍑芥暟鎵ц寮傚父,寮傚父淇℃伅:" + e);
+// }
+// return null;
+// }
+
+ /**
+ * 璁剧疆鍥哄畾鍊煎嚱鏁 鑻ヤ负鏁板瓧鍒欒浆涓簂ong杩斿洖
+ *
+ * @param param 榛樿鍊
+ * @return 杩斿洖鏁板瓧鎴栧瓧绗︿覆
+ */
+ static Object setValue(String param) {
+ try {
+ Matcher isNum = PATTERN.matcher(param);
+ if (isNum.matches()) {
+ return Long.parseLong(param);
+ } else {
+ return param;
+ }
+ } catch (RuntimeException e) {
+ logger.error("SetValue 鍑芥暟寮傚父锛屽紓甯镐俊鎭細" + e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
new file mode 100644
index 0000000..60b3d09
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -0,0 +1,202 @@
+package com.zdjizhi.utils.hbase;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.DefaultProConfig;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HBase 宸ュ叿绫
+ *
+ * @author qidaijie
+ */
+
+public class HBaseUtils {
+ private static final Log logger = LogFactory.get();
+ private static Map subIdMap = new ConcurrentHashMap<>(83334);
+ private static Connection connection;
+ private static Long time;
+
+ private static String zookeeperIp;
+ private static String hBaseTable;
+
+ private static HBaseUtils hBaseUtils;
+
+ private static void getInstance() {
+ hBaseUtils = new HBaseUtils();
+ }
+
+
+ /**
+ * 鏋勯犲嚱鏁-鏂
+ */
+ private HBaseUtils() {
+ zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS;
+ hBaseTable = DefaultProConfig.HBASE_TABLE_NAME;
+ //鑾峰彇杩炴帴
+ getConnection();
+ //鎷夊彇鎵鏈
+ getAll();
+ //瀹氭椂鏇存柊
+ updateCache();
+ }
+
+ private static void getConnection() {
+ try {
+ // 绠$悊Hbase鐨勯厤缃俊鎭
+ Configuration configuration = HBaseConfiguration.create();
+ // 璁剧疆zookeeper鑺傜偣
+ configuration.set("hbase.zookeeper.quorum", zookeeperIp);
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
+ connection = ConnectionFactory.createConnection(configuration);
+ time = System.currentTimeMillis();
+ logger.warn("HBaseUtils get HBase connection,now to getAll().");
+ } catch (IOException ioe) {
+ logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
+ }
+ }
+
+ /**
+ * 鏇存柊鍙橀噺
+ */
+ private static void change() {
+ if (hBaseUtils == null) {
+ getInstance();
+ }
+ long nowTime = System.currentTimeMillis();
+ timestampsFilter(time - 1000, nowTime + 500);
+ }
+
+
+ /**
+ * 鑾峰彇鍙樻洿鍐呭
+ *
+ * @param startTime 寮濮嬫椂闂
+ * @param endTime 缁撴潫鏃堕棿
+ */
+ private static void timestampsFilter(Long startTime, Long endTime) {
+ Long begin = System.currentTimeMillis();
+ Table table = null;
+ ResultScanner scanner = null;
+ Scan scan2 = new Scan();
+ try {
+ table = connection.getTable(TableName.valueOf("sub:" + hBaseTable));
+ scan2.setTimeRange(startTime, endTime);
+ scanner = table.getScanner(scan2);
+ for (Result result : scanner) {
+ Cell[] cells = result.rawCells();
+ for (Cell cell : cells) {
+ String key = Bytes.toString(CellUtil.cloneRow(cell)).trim();
+ String value = Bytes.toString(CellUtil.cloneValue(cell)).trim();
+ if (subIdMap.containsKey(key)) {
+ if (!value.equals(subIdMap.get(key))) {
+ subIdMap.put(key, value);
+ }
+ } else {
+ subIdMap.put(key, value);
+ }
+ }
+ }
+ Long end = System.currentTimeMillis();
+ logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
+ logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime);
+ time = endTime;
+ } catch (IOException ioe) {
+ logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ logger.error("HBase Table Close ERROR! Exception message is:" + e);
+ }
+ }
+ }
+ }
+
+ /**
+ * 鑾峰彇鎵鏈夌殑 key value
+ */
+ private static void getAll() {
+ long begin = System.currentTimeMillis();
+ try {
+ Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable));
+ Scan scan2 = new Scan();
+ ResultScanner scanner = table.getScanner(scan2);
+ for (Result result : scanner) {
+ Cell[] cells = result.rawCells();
+ for (Cell cell : cells) {
+ subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
+ }
+ }
+ logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
+ logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
+ scanner.close();
+ } catch (IOException ioe) {
+ logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
+ }
+ }
+
+ /**
+ * 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie
+ */
+ private void updateCache() {
+// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
+// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
+ change();
+ }
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
+ }
+ }
+ }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * 鑾峰彇 account
+ *
+ * @param clientIp client_ip
+ * @return account
+ */
+ public static String getAccount(String clientIp) {
+
+ if (hBaseUtils == null) {
+ getInstance();
+ }
+ return subIdMap.get(clientIp);
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..1adb1d1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.utils.http;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 鑾峰彇缃戝叧schema鐨勫伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 璇锋眰缃戝叧鑾峰彇schema
+ *
+ * @param http 缃戝叧url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ CloseableHttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(get);
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
+ }
+ entityStringBuilder.append(c);
+ }
+
+ return entityStringBuilder.toString();
+ }
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (bufferedReader != null) {
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
new file mode 100644
index 0000000..bdcc43d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -0,0 +1,283 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.http.HttpClientUtil;
+import net.sf.cglib.beans.BeanGenerator;
+import net.sf.cglib.beans.BeanMap;
+
+import java.util.*;
+
+/**
+ * 浣跨敤FastJson瑙f瀽json鐨勫伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class JsonParseUtil {
+
+ private static final Log logger = LogFactory.get();
+
+ private static ArrayList dropList = new ArrayList<>();
+
+ /**
+ * 妯″紡鍖归厤锛岀粰瀹氫竴涓被鍨嬪瓧绗︿覆杩斿洖涓涓被绫诲瀷
+ *
+ * @param type 绫诲瀷
+ * @return 绫荤被鍨
+ */
+
+ private static Class getClassName(String type) {
+ Class clazz;
+
+ switch (type) {
+ case "int":
+ clazz = Integer.class;
+ break;
+ case "string":
+ clazz = String.class;
+ break;
+ case "long":
+ clazz = long.class;
+ break;
+ case "array":
+ clazz = List.class;
+ break;
+ case "double":
+ clazz = double.class;
+ break;
+ case "float":
+ clazz = float.class;
+ break;
+ case "char":
+ clazz = char.class;
+ break;
+ case "byte":
+ clazz = byte.class;
+ break;
+ case "boolean":
+ clazz = boolean.class;
+ break;
+ case "short":
+ clazz = short.class;
+ break;
+ default:
+ clazz = String.class;
+ }
+ return clazz;
+ }
+
+ /**
+ * 鑾峰彇灞炴у肩殑鏂规硶
+ *
+ * @param obj 瀵硅薄
+ * @param property key
+ * @return 灞炴х殑鍊
+ */
+ public static Object getValue(Object obj, String property) {
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ return beanMap.get(property);
+ } catch (RuntimeException e) {
+ logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 鑾峰彇灞炴у肩殑鏂规硶
+ *
+ * @param jsonMap 鍘熷鏃ュ織
+ * @param property key
+ * @return 灞炴х殑鍊
+ */
+ public static Object getValue(Map jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 鏇存柊灞炴у肩殑鏂规硶
+ *
+ * @param jsonMap 鍘熷鏃ュ織json map
+ * @param property 鏇存柊鐨刱ey
+ * @param value 鏇存柊鐨勫
+ */
+ public static void setValue(Map jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e);
+ }
+ }
+
+ /**
+ * 鏇存柊灞炴у肩殑鏂规硶
+ *
+ * @param obj 瀵硅薄
+ * @param property 鏇存柊鐨刱ey
+ * @param value 鏇存柊鐨勫
+ */
+ public static void setValue(Object obj, String property, Object value) {
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ beanMap.put(property, value);
+ } catch (ClassCastException e) {
+ logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e);
+ }
+ }
+
+ /**
+ * 鏍规嵁鍙嶅皠鐢熸垚瀵硅薄鐨勬柟娉
+ *
+ * @param properties 鍙嶅皠绫荤敤鐨刴ap
+ * @return 鐢熸垚鐨凮bject绫诲瀷鐨勫璞
+ */
+ public static Object generateObject(Map properties) {
+ BeanGenerator generator = new BeanGenerator();
+ Set keySet = properties.keySet();
+ for (Object aKeySet : keySet) {
+ String key = (String) aKeySet;
+ generator.addProperty(key, (Class) properties.get(key));
+ }
+ return generator.create();
+ }
+
+ /**
+ * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞
+ *
+ * @param http 缃戝叧schema鍦板潃
+ * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎
+ */
+ public static HashMap getMapFromHttp(String http) {
+ HashMap map = new HashMap<>(16);
+
+ String schema = HttpClientUtil.requestByGetMethod(http);
+ Object data = JSON.parseObject(schema).get("data");
+
+ //鑾峰彇fields锛屽苟杞寲涓烘暟缁勶紝鏁扮粍鐨勬瘡涓厓绱犻兘鏄竴涓猲ame doc type
+ JSONObject schemaJson = JSON.parseObject(data.toString());
+ JSONArray fields = (JSONArray) schemaJson.get("fields");
+
+ for (Object field : fields) {
+ String filedStr = field.toString();
+ if (checkKeepField(filedStr)) {
+ String name = JsonPath.read(filedStr, "$.name").toString();
+ String type = JsonPath.read(filedStr, "$.type").toString();
+ if (type.contains("{")) {
+ type = JsonPath.read(filedStr, "$.type.type").toString();
+ }
+ //缁勫悎鐢ㄦ潵鐢熸垚瀹炰綋绫荤殑map
+ map.put(name, getClassName(type));
+ } else {
+ dropList.add(filedStr);
+ }
+ }
+ return map;
+ }
+
+ /**
+ * 鍒ゆ柇瀛楁鏄惁闇瑕佷繚鐣
+ *
+ * @param message 鍗曚釜field-json
+ * @return true or false
+ */
+ private static boolean checkKeepField(String message) {
+ boolean isKeepField = true;
+ boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
+ if (isHiveDoc) {
+ boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
+ if (isHiveVi) {
+ String visibility = JsonPath.read(message, "$.doc.visibility").toString();
+ if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
+ isKeepField = false;
+ }
+ }
+ }
+ return isKeepField;
+ }
+
+ static void dropJsonField(Map jsonMap) {
+ for (String field : dropList) {
+ jsonMap.remove(field);
+ }
+ }
+
+ /**
+ * 鏍规嵁http閾炬帴鑾峰彇schema锛岃В鏋愪箣鍚庤繑鍥炰竴涓换鍔″垪琛 (useList toList funcList paramlist)
+ *
+ * @param http 缃戝叧url
+ * @return 浠诲姟鍒楄〃
+ */
+ public static ArrayList getJobListFromHttp(String http) {
+ ArrayList list = new ArrayList<>();
+
+ String schema = HttpClientUtil.requestByGetMethod(http);
+ //瑙f瀽data
+ Object data = JSON.parseObject(schema).get("data");
+
+ //鑾峰彇fields锛屽苟杞寲涓烘暟缁勶紝鏁扮粍鐨勬瘡涓厓绱犻兘鏄竴涓猲ame doc type
+ JSONObject schemaJson = JSON.parseObject(data.toString());
+ JSONArray fields = (JSONArray) schemaJson.get("fields");
+
+ for (Object field : fields) {
+
+ if (JSON.parseObject(field.toString()).containsKey("doc")) {
+ Object doc = JSON.parseObject(field.toString()).get("doc");
+
+ if (JSON.parseObject(doc.toString()).containsKey("format")) {
+ String name = JSON.parseObject(field.toString()).get("name").toString();
+ Object format = JSON.parseObject(doc.toString()).get("format");
+ JSONObject formatObject = JSON.parseObject(format.toString());
+
+ String functions = formatObject.get("functions").toString();
+ String appendTo = null;
+ String params = null;
+
+ if (formatObject.containsKey("appendTo")) {
+ appendTo = formatObject.get("appendTo").toString();
+ }
+
+ if (formatObject.containsKey("param")) {
+ params = formatObject.get("param").toString();
+ }
+
+
+ if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
+ String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
+
+ for (int i = 0; i < functionArray.length; i++) {
+ list.add(new String[]{name, appendToArray[i], functionArray[i], null});
+ }
+
+ } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
+ String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
+
+ for (int i = 0; i < functionArray.length; i++) {
+ list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
+
+ }
+ } else {
+ list.add(new String[]{name, name, functions, params});
+ }
+
+ }
+ }
+
+ }
+ return list;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
new file mode 100644
index 0000000..0b6bc1e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
@@ -0,0 +1,187 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.exception.FlowWriteException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1217:34
+ */
+public class JsonTypeUtils {
+ private static final Log logger = LogFactory.get();
+ /**
+ * 鍦ㄥ唴瀛樹腑鍔犺浇鍙嶅皠绫荤敤鐨刴ap
+ */
+ private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 绫诲瀷杞崲
+ *
+ * @param jsonMap 鍘熷鏃ュ織map
+ */
+ public static Map typeTransform(Map jsonMap) throws RuntimeException {
+ JsonParseUtil.dropJsonField(jsonMap);
+ HashMap tmpMap = new HashMap<>(192);
+ for (String key : jsonMap.keySet()) {
+ if (map.containsKey(key)) {
+ String simpleName = map.get(key).getSimpleName();
+ switch (simpleName) {
+ case "String":
+ tmpMap.put(key, checkString(jsonMap.get(key)));
+ break;
+ case "Integer":
+ tmpMap.put(key, getIntValue(jsonMap.get(key)));
+ break;
+ case "long":
+ tmpMap.put(key, checkLongValue(jsonMap.get(key)));
+ break;
+ case "List":
+ tmpMap.put(key, checkArray(jsonMap.get(key)));
+ break;
+ case "Map":
+ tmpMap.put(key, checkObject(jsonMap.get(key)));
+ break;
+ case "double":
+ tmpMap.put(key, checkDouble(jsonMap.get(key)));
+ break;
+ default:
+ tmpMap.put(key, checkString(jsonMap.get(key)));
+ }
+ }
+ }
+ return tmpMap;
+ }
+
+ /**
+ * String 绫诲瀷妫楠岃浆鎹㈡柟娉
+ *
+ * @param value json value
+ * @return String value
+ */
+ private static String checkString(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map){
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List){
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * array 绫诲瀷妫楠岃浆鎹㈡柟娉
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static Map checkObject(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return (Map) value;
+ }
+
+ throw new FlowWriteException("can not cast to map, value : " + value);
+ }
+
+ /**
+ * array 绫诲瀷妫楠岃浆鎹㈡柟娉
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static List checkArray(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof List) {
+ return (List) value;
+ }
+
+ throw new FlowWriteException("can not cast to List, value : " + value);
+ }
+
+ private static Long checkLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToLong(value);
+ }
+
+ /**
+ * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @param value json value
+ * @return Long value
+ */
+ private static long checkLongValue(Object value) {
+ Long longVal = TypeUtils.castToLong(value);
+ if (longVal == null) {
+ return 0L;
+ }
+
+// return longVal.longValue();
+ return longVal;
+ }
+
+ /**
+ * Double 绫诲瀷鏍¢獙杞崲鏂规硶
+ *
+ * @param value json value
+ * @return Double value
+ */
+ private static Double checkDouble(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToDouble(value);
+ }
+
+
+ private static Integer checkInt(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToInt(value);
+ }
+
+
+ /**
+ * int 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @param value json value
+ * @return int value
+ */
+ private static int getIntValue(Object value) {
+
+ Integer intVal = TypeUtils.castToInt(value);
+ if (intVal == null) {
+ return 0;
+ }
+
+// return intVal.intValue();
+ return intVal;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
new file mode 100644
index 0000000..b13627f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
@@ -0,0 +1,171 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.exception.FlowWriteException;
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1218:20
+ */
+public class TypeUtils {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * Integer 绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ public static Object castToIfFunction(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value.toString();
+ }
+
+ if (value instanceof Integer) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof Long) {
+ return ((Number) value).longValue();
+ }
+
+// if (value instanceof Map) {
+// return (Map) value;
+// }
+//
+// if (value instanceof List) {
+// return Collections.singletonList(value.toString());
+// }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new FlowWriteException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Integer 绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ static Integer castToInt(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //灏 10,20 绫绘暟鎹浆鎹负10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Integer.parseInt(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Integer Error,The error Str is:" + strVal);
+ }
+ }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new FlowWriteException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Double绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return double value or null
+ */
+ static Double castToDouble(Object value) {
+
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //灏 10,20 绫绘暟鎹浆鎹负10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Double.parseDouble(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Double Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new FlowWriteException("can not cast to double, value : " + value);
+ }
+
+ /**
+ * Long绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return (Long)value or null
+ */
+ static Long castToLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //灏 10,20 绫绘暟鎹浆鎹负10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Long.parseLong(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Long Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new FlowWriteException("can not cast to long, value : " + value);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
new file mode 100644
index 0000000..c220064
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -0,0 +1,44 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/813:54
+ */
+public class Consumer {
+ private static Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS);
+ properties.put("group.id", FlowWriteConfig.GROUP_ID);
+ properties.put("session.timeout.ms", "60000");
+ properties.put("max.poll.records", "3000");
+ properties.put("max.partition.fetch.bytes", "31457280");
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ /*
+ * kafka闄愭祦閰嶇疆-20201117
+ */
+// properties.put(ConsumerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.CONSUMER_CLIENT_ID);
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer getKafkaConsumer() {
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.INPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(), createConsumerConfig());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
new file mode 100644
index 0000000..077ae71
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -0,0 +1,53 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.DefaultProConfig;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/814:04
+ */
+public class Producer {
+
+ private static Properties createProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS);
+// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
+ properties.put("retries", DefaultProConfig.RETRIES);
+ properties.put("linger.ms", DefaultProConfig.LINGER_MS);
+ properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
+ properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
+
+ /**
+ * kafka闄愭祦閰嶇疆-20201117
+ */
+// properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID);
+// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+ return properties;
+ }
+
+
+ public static FlinkKafkaProducer getKafkaProducer() {
+ FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
+ FlowWriteConfig.OUTPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createProducerConfig());
+
+ kafkaProducer.setLogFailuresOnly(false);
+// kafkaProducer.setWriteTimestampToKafka(true);
+
+ return kafkaProducer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
new file mode 100644
index 0000000..08fa29b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
@@ -0,0 +1,70 @@
+package com.zdjizhi.utils.system;
+
+import com.zdjizhi.utils.StringUtil;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class FlowWriteConfigurations {
+
+ private static Properties propKafka = new Properties();
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else if (type == 1) {
+ return propKafka.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | RuntimeException e) {
+ propKafka = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
new file mode 100644
index 0000000..2afab03
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
@@ -0,0 +1,190 @@
+package com.zdjizhi.utils.zookeeper;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * @author qidaijie
+ */
+public class DistributedLock implements Lock, Watcher {
+ private static final Log logger = LogFactory.get();
+
+ private ZooKeeper zk = null;
+ /**
+ * 鏍硅妭鐐
+ */
+ private final String ROOT_LOCK = "/locks";
+ /**
+ * 绔炰簤鐨勮祫婧
+ */
+ private String lockName;
+ /**
+ * 绛夊緟鐨勫墠涓涓攣
+ */
+ private String waitLock;
+ /**
+ * 褰撳墠閿
+ */
+ private String currentLock;
+ /**
+ * 璁℃暟鍣
+ */
+ private CountDownLatch countDownLatch;
+
+ private int sessionTimeout = 2000;
+
+ private List exceptionList = new ArrayList();
+
+ /**
+ * 閰嶇疆鍒嗗竷寮忛攣
+ *
+ * @param config 杩炴帴鐨剈rl
+ * @param lockName 绔炰簤璧勬簮
+ */
+ public DistributedLock(String config, String lockName) {
+ this.lockName = lockName;
+ try {
+ // 杩炴帴zookeeper
+ zk = new ZooKeeper(config, sessionTimeout, this);
+ Stat stat = zk.exists(ROOT_LOCK, false);
+ if (stat == null) {
+ // 濡傛灉鏍硅妭鐐逛笉瀛樺湪锛屽垯鍒涘缓鏍硅妭鐐
+ zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (IOException | InterruptedException | KeeperException e) {
+ logger.error("Node already exists!");
+ }
+ }
+
+ // 鑺傜偣鐩戣鍣
+ @Override
+ public void process(WatchedEvent event) {
+ if (this.countDownLatch != null) {
+ this.countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void lock() {
+ if (exceptionList.size() > 0) {
+ throw new LockException(exceptionList.get(0));
+ }
+ try {
+ if (this.tryLock()) {
+ logger.info(Thread.currentThread().getName() + " " + lockName + "鑾峰緱浜嗛攣");
+ } else {
+ // 绛夊緟閿
+ waitForLock(waitLock, sessionTimeout);
+ }
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("鑾峰彇閿佸紓甯" + e);
+ }
+ }
+
+ @Override
+ public boolean tryLock() {
+ try {
+ String splitStr = "_lock_";
+ if (lockName.contains(splitStr)) {
+ throw new LockException("閿佸悕鏈夎");
+ }
+ // 鍒涘缓涓存椂鏈夊簭鑺傜偣
+ currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ // 鍙栨墍鏈夊瓙鑺傜偣
+ List subNodes = zk.getChildren(ROOT_LOCK, false);
+ // 鍙栧嚭鎵鏈塴ockName鐨勯攣
+ List lockObjects = new ArrayList();
+ for (String node : subNodes) {
+ String tmpNode = node.split(splitStr)[0];
+ if (tmpNode.equals(lockName)) {
+ lockObjects.add(node);
+ }
+ }
+ Collections.sort(lockObjects);
+ // 鑻ュ綋鍓嶈妭鐐逛负鏈灏忚妭鐐癸紝鍒欒幏鍙栭攣鎴愬姛
+ if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
+ return true;
+ }
+ // 鑻ヤ笉鏄渶灏忚妭鐐癸紝鍒欐壘鍒拌嚜宸辩殑鍓嶄竴涓妭鐐
+ String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
+ waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("鑾峰彇閿佽繃绋嬪紓甯" + e);
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean tryLock(long timeout, TimeUnit unit) {
+ try {
+ if (this.tryLock()) {
+ return true;
+ }
+ return waitForLock(waitLock, timeout);
+ } catch (KeeperException | InterruptedException | RuntimeException e) {
+ logger.error("鍒ゆ柇鏄惁閿佸畾寮傚父" + e);
+ }
+ return false;
+ }
+
+ // 绛夊緟閿
+ private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
+ Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
+
+ if (stat != null) {
+ this.countDownLatch = new CountDownLatch(1);
+ // 璁℃暟绛夊緟锛岃嫢绛夊埌鍓嶄竴涓妭鐐规秷澶憋紝鍒檖recess涓繘琛宑ountDown锛屽仠姝㈢瓑寰咃紝鑾峰彇閿
+ this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
+ this.countDownLatch = null;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ zk.delete(currentLock, -1);
+ currentLock = null;
+ zk.close();
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("鍏抽棴閿佸紓甯" + e);
+ }
+ }
+
+ @Override
+ public Condition newCondition() {
+ return null;
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ this.lock();
+ }
+
+
+ public class LockException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LockException(String e) {
+ super(e);
+ }
+
+ public LockException(Exception e) {
+ super(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
new file mode 100644
index 0000000..ebf4368
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
@@ -0,0 +1,139 @@
+package com.zdjizhi.utils.zookeeper;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author qidaijie
+ * @Package cn.ac.iie.utils.zookeeper
+ * @Description:
+ * @date 2020/11/1411:28
+ */
+public class ZookeeperUtils implements Watcher {
+ private static final Log logger = LogFactory.get();
+
+ private ZooKeeper zookeeper;
+
+ private static final int SESSION_TIME_OUT = 20000;
+
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected) {
+ countDownLatch.countDown();
+ }
+ }
+
+
+ /**
+ * 淇敼鑺傜偣淇℃伅
+ *
+ * @param path 鑺傜偣璺緞
+ */
+ public int modifyNode(String path, String zookeeperIp) {
+ createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
+ int workerId = 0;
+ try {
+ connectZookeeper(zookeeperIp);
+ Stat stat = zookeeper.exists(path, true);
+ workerId = Integer.parseInt(getNodeDate(path));
+ if (workerId > 63) {
+ workerId = 0;
+ zookeeper.setData(path, "1".getBytes(), stat.getVersion());
+ } else {
+ String result = String.valueOf(workerId + 1);
+ if (stat != null) {
+ zookeeper.setData(path, result.getBytes(), stat.getVersion());
+ } else {
+ logger.error("Node does not exist!,Can't modify");
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("modify error Can't modify," + e);
+ } finally {
+ closeConn();
+ }
+ logger.warn("workerID is锛" + workerId);
+ return workerId;
+ }
+
+ /**
+ * 杩炴帴zookeeper
+ *
+ * @param host 鍦板潃
+ */
+ public void connectZookeeper(String host) {
+ try {
+ zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
+ countDownLatch.await();
+ } catch (IOException | InterruptedException e) {
+ logger.error("Connection to the Zookeeper Exception! message:" + e);
+ }
+ }
+
+ /**
+ * 鍏抽棴杩炴帴
+ */
+ public void closeConn() {
+ try {
+ if (zookeeper != null) {
+ zookeeper.close();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Close the Zookeeper connection Exception! message:" + e);
+ }
+ }
+
+ /**
+ * 鑾峰彇鑺傜偣鍐呭
+ *
+ * @param path 鑺傜偣璺緞
+ * @return 鍐呭/寮傚父null
+ */
+ public String getNodeDate(String path) {
+ String result = null;
+ Stat stat = new Stat();
+ try {
+ byte[] resByte = zookeeper.getData(path, true, stat);
+
+ result = StrUtil.str(resByte, "UTF-8");
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("Get node information exception" + e);
+ }
+ return result;
+ }
+
+ /**
+ * @param path 鑺傜偣鍒涘缓鐨勮矾寰
+ * @param date 鑺傜偣鎵瀛樺偍鐨勬暟鎹殑byte[]
+ * @param acls 鎺у埗鏉冮檺绛栫暐
+ */
+ public void createNode(String path, byte[] date, List acls, String zookeeperIp) {
+ try {
+ connectZookeeper(zookeeperIp);
+ Stat exists = zookeeper.exists(path, true);
+ if (exists == null) {
+ Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
+ if (existsSnowflakeld == null) {
+ zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
+ }
+ zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
+ } else {
+ logger.warn("Node already exists ! Don't need to create");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ logger.error(e);
+ } finally {
+ closeConn();
+ }
+ }
+}
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
new file mode 100644
index 0000000..9d91936
--- /dev/null
+++ b/src/main/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 鎺у埗鍙版棩蹇楄缃
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 鏂囦欢鏃ュ織璁剧疆
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#璺緞璇风敤鐩稿璺緞锛屽仛濂界浉鍏虫祴璇曡緭鍑哄埌搴旂敤鐩笅
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 閰嶇疆锛宑om.nis.web.dao鏄痬ybatis鎺ュ彛鎵鍦ㄥ寘
+log4j.logger.com.nis.web.dao=debug
+#bonecp鏁版嵁婧愰厤缃
+log4j.category.com.jolbox=debug,console
+
+
diff --git a/src/main/logback.xml b/src/main/logback.xml
new file mode 100644
index 0000000..a508b6b
--- /dev/null
+++ b/src/main/logback.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+
+
+
+
+
+
+
+ ${LOG_PATTERN}
+
+
+
+
+
+
+ ${LOG_FILE_PATH}
+
+ 30
+
+
+ 20MB
+
+
+
+
+ ${LOG_PATTERN}
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/KafkaLogSend.java b/src/test/java/com/zdjizhi/KafkaLogSend.java
new file mode 100644
index 0000000..5c3feb3
--- /dev/null
+++ b/src/test/java/com/zdjizhi/KafkaLogSend.java
@@ -0,0 +1,92 @@
+package com.zdjizhi;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.DefaultProConfig;
+import org.apache.kafka.clients.producer.*;
+
+import java.util.Properties;
+
+/**
+ * NTC绯荤粺閰嶇疆浜х敓鏃ュ織鍐欏叆鏁版嵁涓績绫
+ *
+ * @author Administrator
+ * @create 2018-08-13 15:11
+ */
+
+public class KafkaLogSend {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * kafka鐢熶骇鑰咃紝鐢ㄤ簬鍚慿afka涓彂閫佹秷鎭
+ */
+ private static org.apache.kafka.clients.producer.Producer kafkaProducer;
+
+ /**
+ * kafka鐢熶骇鑰呴傞厤鍣紙鍗曚緥锛夛紝鐢ㄦ潵浠g悊kafka鐢熶骇鑰呭彂閫佹秷鎭
+ */
+ private static KafkaLogSend kafkaLogSend;
+
+ private KafkaLogSend() {
+ initKafkaProducer();
+ }
+
+ public static KafkaLogSend getInstance() {
+ if (kafkaLogSend == null) {
+ kafkaLogSend = new KafkaLogSend();
+ }
+ return kafkaLogSend;
+ }
+
+
+ public void sendMessage(String message) {
+// for (String value : list) {
+ kafkaProducer.send(new ProducerRecord<>("test", message), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ logger.error("鍐欏叆test鍑虹幇寮傚父", exception);
+ }
+ }
+ });
+// }
+// kafkaProducer.flush();
+ logger.debug("Log sent to National Center successfully!!!!!");
+ }
+
+ /**
+ * 鏍规嵁kafka鐢熶骇鑰呴厤缃俊鎭垵濮嬪寲kafka娑堟伅鐢熶骇鑰,鍙垵濮嬪寲涓娆
+ */
+ private void initKafkaProducer() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("acks", "1");
+ properties.put("retries", DefaultProConfig.RETRIES);
+ properties.put("linger.ms", DefaultProConfig.LINGER_MS);
+ properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
+ properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
+
+ properties.put("security.protocol", "SSL");
+ properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
+ properties.put("ssl.keystore.password", "ceiec2019");
+ properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
+ properties.put("ssl.truststore.password", "ceiec2019");
+ properties.put("ssl.key.password", "ceiec2019");
+
+
+ /*
+ * kafka闄愭祦閰嶇疆-20201117
+ */
+// properties.put(ProducerConfig.CLIENT_ID_CONFIG, VoipRelationConfig.PRODUCER_CLIENT_ID);
+// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+
+ kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+
+}
diff --git a/src/test/java/com/zdjizhi/KafkaTest.java b/src/test/java/com/zdjizhi/KafkaTest.java
new file mode 100644
index 0000000..3bb6d1c
--- /dev/null
+++ b/src/test/java/com/zdjizhi/KafkaTest.java
@@ -0,0 +1,53 @@
+package com.zdjizhi;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.kafka.clients.producer.*;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/8/217:39
+ */
+public class KafkaTest {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("acks", "1");
+// properties.put("retries", DefaultProConfig.RETRIES);
+// properties.put("linger.ms", DefaultProConfig.LINGER_MS);
+// properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
+// properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
+// properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
+// properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
+
+ properties.put("security.protocol", "SSL");
+// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
+ properties.put("ssl.keystore.location", "/usr/ca/client/client.keystore.jks");
+ properties.put("ssl.keystore.password", "ceiec2019");
+// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
+ properties.put("ssl.truststore.location", "/usr/ca/trust/client.truststore.jks");
+ properties.put("ssl.truststore.password", "ceiec2019");
+ properties.put("ssl.key.password", "ceiec2019");
+
+ Producer producer = new KafkaProducer(properties);
+
+ producer.send(new ProducerRecord<>("test", "hello!"), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ logger.error("鍐欏叆test鍑虹幇寮傚父", exception);
+ }
+ }
+ });
+
+ producer.close();
+ }
+}
diff --git a/src/test/java/com/zdjizhi/LocationTest.java b/src/test/java/com/zdjizhi/LocationTest.java
new file mode 100644
index 0000000..e7b2d15
--- /dev/null
+++ b/src/test/java/com/zdjizhi/LocationTest.java
@@ -0,0 +1,28 @@
+package com.zdjizhi;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.IpLookup;
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/8/1811:34
+ */
+public class LocationTest {
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v4.mmdb")
+ .loadDataFileV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v6.mmdb")
+ .loadDataFilePrivateV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v4.mmdb")
+ .loadDataFilePrivateV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v6.mmdb")
+ .build();
+
+ @Test
+ public void IpLocationTest() {
+ System.out.println(ipLookup.cityLookupDetail("24.241.112.0"));
+ System.out.println(ipLookup.cityLookupDetail("1.1.1.1"));
+ System.out.println(ipLookup.cityLookupDetail("192.168.50.58"));
+ System.out.println(ipLookup.cityLookupDetail("2600:1700:9010::"));
+ }
+}