From fcd97b7aabe48c5ce73403059481acb0040b3243 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Mon, 23 Aug 2021 17:05:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E6=95=B0=E6=8D=AE=E9=A2=84?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=A8=8B=E5=BA=8F=E5=88=9D=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 301 +++++++++++++++++ properties/default_config.properties | 29 ++ properties/service_flow_config.properties | 74 ++++ .../com/zdjizhi/common/DefaultProConfig.java | 21 ++ .../com/zdjizhi/common/FlowWriteConfig.java | 59 ++++ .../topology/LogFlowWriteTopology.java | 56 ++++ .../java/com/zdjizhi/utils/app/AppUtils.java | 123 +++++++ .../utils/exception/FlowWriteException.java | 18 + .../utils/functions/FilterNullFunction.java | 17 + .../utils/functions/MapCompletedFunction.java | 28 ++ .../zdjizhi/utils/general/SnowflakeId.java | 213 ++++++++++++ .../zdjizhi/utils/general/TransFormMap.java | 144 ++++++++ .../utils/general/TransFormObject.java | 153 +++++++++ .../utils/general/TransFormTypeMap.java | 145 ++++++++ .../zdjizhi/utils/general/TransFunction.java | 317 ++++++++++++++++++ .../com/zdjizhi/utils/hbase/HBaseUtils.java | 202 +++++++++++ .../zdjizhi/utils/http/HttpClientUtil.java | 77 +++++ .../com/zdjizhi/utils/json/JsonParseUtil.java | 283 ++++++++++++++++ .../com/zdjizhi/utils/json/JsonTypeUtils.java | 187 +++++++++++ .../com/zdjizhi/utils/json/TypeUtils.java | 171 ++++++++++ .../com/zdjizhi/utils/kafka/Consumer.java | 44 +++ .../com/zdjizhi/utils/kafka/Producer.java | 53 +++ .../utils/system/FlowWriteConfigurations.java | 70 ++++ .../utils/zookeeper/DistributedLock.java | 190 +++++++++++ .../utils/zookeeper/ZookeeperUtils.java | 139 ++++++++ src/main/log4j.properties | 25 ++ src/main/logback.xml | 42 +++ src/test/java/com/zdjizhi/KafkaLogSend.java | 92 +++++ src/test/java/com/zdjizhi/KafkaTest.java | 53 +++ src/test/java/com/zdjizhi/LocationTest.java | 28 ++ 30 files changed, 3354 insertions(+) create mode 100644 pom.xml create mode 100644 properties/default_config.properties create mode 100644 properties/service_flow_config.properties create mode 100644 src/main/java/com/zdjizhi/common/DefaultProConfig.java create mode 100644 src/main/java/com/zdjizhi/common/FlowWriteConfig.java create mode 100644 src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java create mode 100644 src/main/java/com/zdjizhi/utils/app/AppUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/general/SnowflakeId.java create mode 100644 src/main/java/com/zdjizhi/utils/general/TransFormMap.java create mode 100644 src/main/java/com/zdjizhi/utils/general/TransFormObject.java create mode 100644 src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java create mode 100644 src/main/java/com/zdjizhi/utils/general/TransFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java create mode 100644 src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java create mode 100644 src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/json/TypeUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/Consumer.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/Producer.java create mode 100644 src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java create mode 100644 src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java create mode 100644 src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java create mode 100644 src/main/log4j.properties create mode 100644 src/main/logback.xml create mode 100644 src/test/java/com/zdjizhi/KafkaLogSend.java create mode 100644 src/test/java/com/zdjizhi/KafkaTest.java create mode 100644 src/test/java/com/zdjizhi/LocationTest.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c7ddb06 --- /dev/null +++ b/pom.xml @@ -0,0 +1,301 @@ + + + + 4.0.0 + + com.zdjizhi + log-completion-schema + 20210728 + + log-completion-schema + http://www.example.com + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + maven-ali + http://maven.aliyun.com/nexus/content/groups/public/ + + + + + + fail + + + + + + UTF-8 + 1.13.1 + 2.7.1 + 1.0.0 + 2.2.3 + provided + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + com.zdjizhi.topology.LogFlowWriteTopology + + + + + + + + + io.github.zlika + reproducible-build-maven-plugin + 0.2 + + + + strip-jar + + package + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + **/*.xml + + false + + + + src\main\java + + log4j.properties + + false + + + + + + + + com.zdjizhi + galaxy + 1.0.6 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + com.alibaba + fastjson + 1.2.70 + + + + + org.apache.flink + flink-table + ${flink.version} + pom + ${scope.type} + + + + + org.apache.flink + flink-core + ${flink.version} + ${scope.type} + + + + + + org.apache.flink + flink-streaming-java_2.11 + ${flink.version} + + + + + + org.apache.flink + flink-clients_2.11 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-connector-kafka_2.11 + ${flink.version} + + + + + + org.apache.flink + flink-java + ${flink.version} + ${scope.type} + + + + + org.apache.zookeeper + zookeeper + 3.4.10 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hbase + hbase-client + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + cglib + cglib-nodep + 3.2.4 + + + + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile + + + + org.apache.httpcomponents + httpclient + 4.5.2 + + + + com.jayway.jsonpath + json-path + 2.4.0 + + + + io.prometheus + simpleclient_pushgateway + 0.9.0 + + + + cn.hutool + hutool-all + 5.5.2 + + + + junit + junit + 4.12 + test + + + + + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..d82130d --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,29 @@ +#producer重试的次数设置 +retries=0 + +#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 +linger.ms=10 + +#如果在超时之前未收到响应,客户端将在必要时重新发送请求 +request.timeout.ms=30000 + +#producer都是按照batch进行发送的,批次大小,默认:16384 +batch.size=262144 + +#Producer端用于缓存消息的缓冲区大小 +#64M +#buffer.memory=67108864 +#128M +buffer.memory=134217728 + +#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 +#5M +#max.request.size=5242880 +#10M +max.request.size=10485760 + +#hbase table name +hbase.table.name=subscriber_info + +#邮件默认编码 +mail.default.charset=UTF-8 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..9bb2f84 --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,74 @@ +#--------------------------------鍦板潃閰嶇疆------------------------------# + +#绠$悊kafka鍦板潃 +input.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 + +#绠$悊杈撳嚭kafka鍦板潃 +output.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 + +#zookeeper 鍦板潃 鐢ㄤ簬閰嶇疆log_id +zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 + +#hbase zookeeper鍦板潃 鐢ㄤ簬杩炴帴HBase +hbase.zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 + +#--------------------------------HTTP/瀹氫綅搴------------------------------# +#瀹氫綅搴撳湴鍧 +ip.library=/home/bigdata/topology/dat/ + +#缃戝叧鐨剆chema浣嶇疆 +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log + +#缃戝叧APP_ID 鑾峰彇鎺ュ彛 +app.id.http=http://192.168.44.67:9999/open-api/appDicList + +#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# + +#kafka 鎺ユ敹鏁版嵁topic +input.kafka.topic=CONNECTION-RECORD-LOG + +#琛ュ叏鏁版嵁 杈撳嚭 topic +output.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG + +#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 +group.id=connection-record-flink-20210809 + +#鐢熶骇鑰呭帇缂╂ā寮 none or snappy +producer.kafka.compression.type=none + +#鐢熶骇鑰卆ck +producer.ack=1 + +#鎺ユ敹鑷猭afka鐨勬秷璐硅 client-id +consumer.client.id=consumer-connection-record + +#鍥炲啓缁檏afka鐨勭敓浜ц client-id +producer.client.id=producer-connection-record + +#--------------------------------topology閰嶇疆------------------------------# + +#consumer 骞惰搴 +consumer.parallelism=3 + +#map鍑芥暟骞惰搴 +map.parallelism=3 + +#producer 骞惰搴 +producer.parallelism=3 + +#鏁版嵁涓績锛屽彇鍊艰寖鍥(0-63) +data.center.id.num=0 + +#hbase 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨 +hbase.tick.tuple.freq.secs=180 + +#app_id 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨 +app.tick.tuple.freq.secs=0 + +#--------------------------------榛樿鍊奸厤缃------------------------------# + +#閭欢榛樿缂栫爜 +mail.default.charset=UTF-8 + +#0涓嶉渶瑕佽ˉ鍏ㄥ師鏍疯緭鍑烘棩蹇楋紝1闇瑕佽ˉ鍏 +log.need.complete=1 diff --git a/src/main/java/com/zdjizhi/common/DefaultProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java new file mode 100644 index 0000000..b98ea53 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DefaultProConfig.java @@ -0,0 +1,21 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.FlowWriteConfigurations; + +/** + * @author Administrator + */ +public class DefaultProConfig { + + + public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); + + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java new file mode 100644 index 0000000..bf82757 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -0,0 +1,59 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.FlowWriteConfigurations; + +/** + * @author Administrator + */ +public class FlowWriteConfig { + + public static final int IF_PARAM_LENGTH = 3; + public static final String VISIBILITY = "disabled"; + public static final String FORMAT_SPLITTER = ","; + public static final String IS_JSON_KEY_TAG = "$."; + public static final String IF_CONDITION_SPLITTER = "="; + public static final String MODEL = "remote"; + public static final String PROTOCOL_SPLITTER = "\\."; + /** + * System + */ + public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism"); + public static final Integer MAP_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "map.parallelism"); + public static final Integer PRODUCER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "producer.parallelism"); + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); + public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); + public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); + public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); + + + + /** + * kafka + */ + public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); + public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack"); + public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); + + /** + * kafka闄愭祦閰嶇疆-20201117 + */ + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id"); + public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id"); + + /** + * http + */ + public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); + public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http"); + + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..a9b38ca --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -0,0 +1,56 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.functions.FilterNullFunction; +import com.zdjizhi.utils.functions.MapCompletedFunction; +import com.zdjizhi.utils.kafka.Consumer; +import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class LogFlowWriteTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + //寮鍚疌heckpoint锛宨nterval鐢ㄤ簬鎸囧畾checkpoint鐨勮Е鍙戦棿闅(鍗曚綅milliseconds) +// environment.enableCheckpointing(5000); + + DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer()) + .setParallelism(FlowWriteConfig.CONSUMER_PARALLELISM); + + if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { + //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑 + DataStream cleaningLog = streamSource.map(new MapCompletedFunction()) + .name("TransFormLogs").setParallelism(FlowWriteConfig.MAP_PARALLELISM); + //杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐 + DataStream result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData"); + //鍙戦佹暟鎹埌Kafka + result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka") + .setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM); + } else { + DataStream result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData"); + result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM); + } + + try { + environment.execute(args[0]); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java new file mode 100644 index 0000000..0caeb25 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java @@ -0,0 +1,123 @@ +package com.zdjizhi.utils.app; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.http.HttpClientUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * AppId 宸ュ叿绫 + * + * @author qidaijie + */ + +public class AppUtils { + private static final Log logger = LogFactory.get(); + private static Map appIdMap = new ConcurrentHashMap<>(128); + private static AppUtils appUtils; + + private static void getAppInstance() { + appUtils = new AppUtils(); + } + + + /** + * 鏋勯犲嚱鏁-鏂 + */ + private AppUtils() { + //瀹氭椂鏇存柊 + updateAppIdCache(); + } + + /** + * 鏇存柊鍙橀噺 + */ + private static void change() { + if (appUtils == null) { + getAppInstance(); + } + timestampsFilter(); + } + + + /** + * 鑾峰彇鍙樻洿鍐呭 + */ + private static void timestampsFilter() { + try { + Long begin = System.currentTimeMillis(); + String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP); + if (StringUtil.isNotBlank(schema)) { + String data = JSONObject.parseObject(schema).getString("data"); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + int key = jsonArray.getInteger(0); + String value = jsonArray.getString(1); + if (appIdMap.containsKey(key)) { + if (!value.equals(appIdMap.get(key))) { + appIdMap.put(key, value); + } + } else { + appIdMap.put(key, value); + } + } + logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis())); + logger.warn("Pull the length of the interface data:[" + objects.size() + "]"); + } + } catch (RuntimeException e) { + logger.error("Update cache app-id failed, exception锛" + e); + } + } + + + /** + * 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie + */ + private void updateAppIdCache() { + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } + } catch (RuntimeException e) { + logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + + /** + * 鑾峰彇 appName + * + * @param appId app_id + * @return account + */ + public static String getAppName(int appId) { + + if (appUtils == null) { + getAppInstance(); + } + + if (appIdMap.containsKey(appId)) { + return appIdMap.get(appId); + } else { + logger.warn("AppMap get appName is null, ID is :" + appId); + return ""; + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java new file mode 100644 index 0000000..67c88f0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class FlowWriteException extends RuntimeException { + + public FlowWriteException() { + } + + public FlowWriteException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..de507ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterNullFunction implements FilterFunction { + @Override + public boolean filter(String message) { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java new file mode 100644 index 0000000..5618159 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -0,0 +1,28 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.general.TransFormTypeMap; +import org.apache.flink.api.common.functions.MapFunction; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapCompletedFunction implements MapFunction { + private static final Log logger = LogFactory.get(); + + @Override + @SuppressWarnings("unchecked") + public String map(String logs) { + try { + return TransFormTypeMap.dealCommonMessage(logs); + } catch (RuntimeException e) { + logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + logs); + return ""; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java new file mode 100644 index 0000000..d203a2b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java @@ -0,0 +1,213 @@ +package com.zdjizhi.utils.general; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.zookeeper.DistributedLock; +import com.zdjizhi.utils.zookeeper.ZookeeperUtils; + +/** + * 闆姳绠楁硶 + * + * @author qidaijie + */ +public class SnowflakeId { + private static final Log logger = LogFactory.get(); + + /** + * 鍏64浣 绗竴浣嶄负绗﹀彿浣 榛樿0 + * 鏃堕棿鎴 39浣(17 year), centerId:(鍏宠仈姣忎釜鐜鎴栦换鍔℃暟) :6浣(0-63), + * workerId(鍏宠仈杩涚▼):7(0-127) ,搴忓垪鍙凤細11浣(2047/ms) + * + * 搴忓垪鍙 /ms = (-1L ^ (-1L << 11)) + * 鏈澶т娇鐢ㄥ勾 = (1L << 39) / (1000L * 60 * 60 * 24 * 365) + */ + /** + * 寮濮嬫椂闂存埅 (2020-11-14 00:00:00) max 17years + */ + private final long twepoch = 1605283200000L; + + /** + * 鏈哄櫒id鎵鍗犵殑浣嶆暟 + */ + private final long workerIdBits = 7L; + + /** + * 鏁版嵁鏍囪瘑id鎵鍗犵殑浣嶆暟 + */ + private final long dataCenterIdBits = 6L; + + /** + * 鏀寔鐨勬渶澶ф満鍣╥d锛岀粨鏋滄槸63 (杩欎釜绉讳綅绠楁硶鍙互寰堝揩鐨勮绠楀嚭鍑犱綅浜岃繘鍒舵暟鎵鑳借〃绀虹殑鏈澶у崄杩涘埗鏁) + * M << n = M * 2^n + */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** + * 鏀寔鐨勬渶澶ф暟鎹爣璇唅d锛岀粨鏋滄槸127 + */ + private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); + + /** + * 搴忓垪鍦╥d涓崰鐨勪綅鏁 + */ + private final long sequenceBits = 11L; + + /** + * 鏈哄櫒ID鍚戝乏绉12浣 + */ + private final long workerIdShift = sequenceBits; + + /** + * 鏁版嵁鏍囪瘑id鍚戝乏绉17浣(14+6) + */ + private final long dataCenterIdShift = sequenceBits + workerIdBits; + + /** + * 鏃堕棿鎴悜宸︾Щ22浣(4+6+14) + */ + private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; + + /** + * 鐢熸垚搴忓垪鐨勬帺鐮侊紝杩欓噷涓2047 + */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** + * 宸ヤ綔鏈哄櫒ID(0~127) + */ + private long workerId; + + /** + * 鏁版嵁涓績ID(0~63) + */ + private long dataCenterId; + + /** + * 姣鍐呭簭鍒(0~2047) + */ + private long sequence = 0L; + + /** + * 涓婃鐢熸垚ID鐨勬椂闂存埅 + */ + private long lastTimestamp = -1L; + + + /** + * 璁剧疆鍏佽鏃堕棿鍥炴嫧鐨勬渶澶ч檺鍒10s + */ + private static final long rollBackTime = 10000L; + + + private static SnowflakeId idWorker; + + private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + + static { + idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM); + } + + //==============================Constructors===================================== + + /** + * 鏋勯犲嚱鏁 + */ + private SnowflakeId(String zookeeperIp, long dataCenterIdNum) { + DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + try { + lock.lock(); + int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp); + if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); + } + this.workerId = tmpWorkerId; + this.dataCenterId = dataCenterIdNum; + } catch (RuntimeException e) { + logger.error("This is not usual error!!!===>>>" + e + "<<<==="); + }finally { + lock.unlock(); + } + } + + // ==============================Methods========================================== + + /** + * 鑾峰緱涓嬩竴涓狪D (璇ユ柟娉曟槸绾跨▼瀹夊叏鐨) + * + * @return SnowflakeId + */ + private synchronized long nextId() { + long timestamp = timeGen(); + //璁剧疆涓涓厑璁稿洖鎷ㄩ檺鍒舵椂闂达紝绯荤粺鏃堕棿鍥炴嫧鑼冨洿鍦╮ollBackTime鍐呭彲浠ョ瓑寰呮牎鍑 + if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) { + timestamp = tilNextMillis(lastTimestamp); + } + //濡傛灉褰撳墠鏃堕棿灏忎簬涓婁竴娆D鐢熸垚鐨勬椂闂存埑锛岃鏄庣郴缁熸椂閽熷洖閫杩囪繖涓椂鍊欏簲褰撴姏鍑哄紓甯 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //濡傛灉鏄悓涓鏃堕棿鐢熸垚鐨勶紝鍒欒繘琛屾绉掑唴搴忓垪 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //姣鍐呭簭鍒楁孩鍑 + if (sequence == 0) { + //闃诲鍒颁笅涓涓绉,鑾峰緱鏂扮殑鏃堕棿鎴 + timestamp = tilNextMillis(lastTimestamp); + } + } + //鏃堕棿鎴虫敼鍙橈紝姣鍐呭簭鍒楅噸缃 + else { + sequence = 0L; + } + + //涓婃鐢熸垚ID鐨勬椂闂存埅 + lastTimestamp = timestamp; + + //绉讳綅骞堕氳繃鎴栬繍绠楁嫾鍒颁竴璧风粍鎴64浣嶇殑ID + return ((timestamp - twepoch) << timestampLeftShift) + | (dataCenterId << dataCenterIdShift) + | (workerId << workerIdShift) + | sequence; + } + + /** + * 闃诲鍒颁笅涓涓绉掞紝鐩村埌鑾峰緱鏂扮殑鏃堕棿鎴 + * + * @param lastTimestamp 涓婃鐢熸垚ID鐨勬椂闂存埅 + * @return 褰撳墠鏃堕棿鎴 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 杩斿洖浠ユ绉掍负鍗曚綅鐨勫綋鍓嶆椂闂 + * + * @return 褰撳墠鏃堕棿(姣) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + + + /** + * 闈欐佸伐鍏风被 + * + * @return + */ + public static Long generateId() { + return idWorker.nextId(); + } + + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java new file mode 100644 index 0000000..f67b842 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -0,0 +1,144 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.ArrayList; +import java.util.Map; + + +/** + * 鎻忚堪:杞崲鎴栬ˉ鍏ㄥ伐鍏风被 + * + * @author qidaijie + */ +public class TransFormMap { + private static final Log logger = LogFactory.get(); + + /** + * 鑾峰彇浠诲姟鍒楄〃 + * list鐨勬瘡涓厓绱犳槸涓涓洓鍏冨瓧绗︿覆鏁扮粍 (鏈塮ormat鏍囪瘑鐨勫瓧娈碉紝琛ュ叏鐨勫瓧娈碉紝鐢ㄥ埌鐨勫姛鑳藉嚱鏁帮紝鐢ㄥ埌鐨勫弬鏁)锛屼緥濡傦細 + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 瑙f瀽鏃ュ織锛屽苟琛ュ叏 + * + * @param message kafka Topic鍘熷鏃ュ織 + * @return 琛ュ叏鍚庣殑鏃ュ織 + */ + @SuppressWarnings("unchecked") + public static String dealCommonMessage(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); + for (String[] strings : jobList) { + //鐢ㄥ埌鐨勫弬鏁扮殑鍊 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey + String appendToKeyName = strings[1]; + //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫 + Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈 + String function = strings[2]; + //棰濆鐨勫弬鏁扮殑鍊 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param); + } + return JsonMapper.toJsonString(jsonMap); + } else { + return ""; + } + } catch (RuntimeException e) { + logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message); + return ""; + } + } + + + /** + * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎 + * + * @param function 鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈 + * @param jsonMap 鍘熷鏃ュ織瑙f瀽map + * @param appendToKeyName 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey + * @param appendTo 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫 + * @param logValue 鐢ㄥ埌鐨勫弬鏁扮殑鍊 + * @param param 棰濆鐨勫弬鏁扮殑鍊 + */ + private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); + } + break; + case "geo_asn": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); + } + break; + case "geo_ip_country": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); + } + break; + case "set_value": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendTo == null && logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); + } + break; + case "radius_match": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString())); + } + break; + case "app_match": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); + } + break; + case "decode_of_base64": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (logValue != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java new file mode 100644 index 0000000..26795b0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java @@ -0,0 +1,153 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.ArrayList; +import java.util.HashMap; + + +/** + * 鎻忚堪:杞崲鎴栬ˉ鍏ㄥ伐鍏风被 + * + * @author qidaijie + */ +public class TransFormObject { + private static final Log logger = LogFactory.get(); + + /** + * 鍦ㄥ唴瀛樹腑鍔犺浇鍙嶅皠绫荤敤鐨刴ap + */ + private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 鍙嶅皠鎴愪竴涓被 + */ + private static Object mapObject = JsonParseUtil.generateObject(map); + + /** + * 鑾峰彇浠诲姟鍒楄〃 + * list鐨勬瘡涓厓绱犳槸涓涓洓鍏冨瓧绗︿覆鏁扮粍 (鏈塮ormat鏍囪瘑鐨勫瓧娈碉紝琛ュ叏鐨勫瓧娈碉紝鐢ㄥ埌鐨勫姛鑳藉嚱鏁帮紝鐢ㄥ埌鐨勫弬鏁)锛屼緥濡傦細 + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 瑙f瀽鏃ュ織锛屽苟琛ュ叏 + * + * @param message kafka Topic鍘熷鏃ュ織 + * @return 琛ュ叏鍚庣殑鏃ュ織 + */ + public static String dealCommonMessage(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Object object = JsonMapper.fromJsonString(message, mapObject.getClass()); + for (String[] strings : jobList) { + //鐢ㄥ埌鐨勫弬鏁扮殑鍊 + Object name = JsonParseUtil.getValue(object, strings[0]); + //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey + String appendToKeyName = strings[1]; + //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫 + Object appendTo = JsonParseUtil.getValue(object, appendToKeyName); + //鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈 + String function = strings[2]; + //棰濆鐨勫弬鏁扮殑鍊 + String param = strings[3]; + functionSet(function, object, appendToKeyName, appendTo, name, param); + } + return JsonMapper.toJsonString(object); + } else { + return ""; + } + } catch (RuntimeException e) { + logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message); + return ""; + } + } + + + /** + * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎 + * + * @param function 鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈 + * @param object 鍔ㄦ丳OJO Object + * @param appendToKeyName 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey + * @param appendTo 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫 + * @param name 鐢ㄥ埌鐨勫弬鏁扮殑鍊 + * @param param 棰濆鐨勫弬鏁扮殑鍊 + */ + private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); + } + break; + case "geo_asn": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString())); + } + break; + case "geo_ip_country": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); + } + break; + case "set_value": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param)); + } + break; + case "get_value": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, name); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param)); + } + break; + case "sub_domain": + if (appendTo == null && name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString())); + } + break; + case "radius_match": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); + } + break; + case "app_match": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); + } + break; + case "decode_of_base64": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); + } + break; + case "flattenSpec": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param)); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java new file mode 100644 index 0000000..7779da2 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -0,0 +1,145 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.JsonTypeUtils; + +import java.util.ArrayList; +import java.util.Map; + + +/** + * 鎻忚堪:杞崲鎴栬ˉ鍏ㄥ伐鍏风被 + * + * @author qidaijie + */ +public class TransFormTypeMap { + private static final Log logger = LogFactory.get(); + + /** + * 鑾峰彇浠诲姟鍒楄〃 + * list鐨勬瘡涓厓绱犳槸涓涓洓鍏冨瓧绗︿覆鏁扮粍 (鏈塮ormat鏍囪瘑鐨勫瓧娈碉紝琛ュ叏鐨勫瓧娈碉紝鐢ㄥ埌鐨勫姛鑳藉嚱鏁帮紝鐢ㄥ埌鐨勫弬鏁)锛屼緥濡傦細 + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 瑙f瀽鏃ュ織锛屽苟琛ュ叏 + * + * @param message kafka Topic鍘熷鏃ュ織 + * @return 琛ュ叏鍚庣殑鏃ュ織 + */ + @SuppressWarnings("unchecked") + public static String dealCommonMessage(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); + for (String[] strings : jobList) { + //鐢ㄥ埌鐨勫弬鏁扮殑鍊 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey + String appendToKeyName = strings[1]; + //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫 + Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈 + String function = strings[2]; + //棰濆鐨勫弬鏁扮殑鍊 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); + } + return JsonMapper.toJsonString(JsonTypeUtils.typeTransform(jsonMap)); + } else { + return ""; + } + } catch (RuntimeException e) { + logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message); + return ""; + } + } + + + /** + * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎 + * + * @param function 鍖归厤鎿嶄綔鍑芥暟鐨勫瓧娈 + * @param jsonMap 鍘熷鏃ュ織瑙f瀽map + * @param appendToKeyName 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey + * @param appendToKeyValue 闇瑕佽ˉ鍏ㄧ殑瀛楁鐨勫 + * @param logValue 鐢ㄥ埌鐨勫弬鏁扮殑鍊 + * @param param 棰濆鐨勫弬鏁扮殑鍊 + */ + private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) { + switch (function) { + case "current_timestamp": + if (!(appendToKeyValue instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); + } + break; + case "geo_asn": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); + } + break; + case "geo_ip_country": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); + } + break; + case "set_value": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendToKeyValue == null && logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); + } + break; + case "radius_match": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString())); + } + break; + case "app_match": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); + } + break; + case "decode_of_base64": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (logValue != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java new file mode 100644 index 0000000..9fada7b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -0,0 +1,317 @@ +package com.zdjizhi.utils.general; + +import cn.hutool.core.codec.Base64; +import cn.hutool.core.text.StrSpliter; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.DefaultProConfig; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.FormatUtils; +import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.app.AppUtils; +import com.zdjizhi.utils.hbase.HBaseUtils; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.TypeUtils; + +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author qidaijie + */ +class TransFunction { + + private static final Log logger = LogFactory.get(); + + private static final Pattern PATTERN = Pattern.compile("[0-9]*"); + + /** + * IP瀹氫綅搴撳伐鍏风被 + */ + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") + .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb") + .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") + .build(); + + /** + * 鐢熸垚褰撳墠鏃堕棿鎴崇殑鎿嶄綔 + */ + static long getCurrentTime() { + + return System.currentTimeMillis() / 1000; + } + + /** + * 鏍规嵁clientIp鑾峰彇location淇℃伅 + * + * @param ip client IP + * @return ip鍦板潃璇︾粏淇℃伅 + */ + static String getGeoIpDetail(String ip) { + + return ipLookup.cityLookupDetail(ip); + + } + + /** + * 鏍规嵁ip鑾峰彇asn淇℃伅 + * + * @param ip client/server IP + * @return ASN + */ + static String getGeoAsn(String ip) { + + return ipLookup.asnLookup(ip); + } + + /** + * 鏍规嵁ip鑾峰彇country淇℃伅 + * + * @param ip server IP + * @return 鍥藉 + */ + static String getGeoIpCountry(String ip) { + + return ipLookup.countryLookup(ip); + } + + + /** + * radius鍊熷姪HBase琛ラ綈 + * + * @param ip client IP + * @return account + */ + static String radiusMatch(String ip) { + String account = HBaseUtils.getAccount(ip.trim()); + if (StringUtil.isBlank(account)) { + logger.warn("HashMap get account is null, Ip is :" + ip); + } + return account; + } + + /** + * appId涓庣紦瀛樹腑瀵瑰簲鍏崇郴琛ュ叏appName + * + * @param appIds app id 鍒楄〃 + * @return appName + */ + static String appMatch(String appIds) { + try { + String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); + return AppUtils.getAppName(Integer.parseInt(appId)); + } catch (NumberFormatException | ClassCastException exception) { + logger.error("APP ID鍒楄〃鍒嗗壊杞崲寮傚父锛屽紓甯窤PP ID鍒楄〃:" + appIds); + return ""; + } + } + + /** + * 瑙f瀽椤剁骇鍩熷悕 + * + * @param domain 鍒濆鍩熷悕 + * @return 椤剁骇鍩熷悕 + */ + static String getTopDomain(String domain) { + try { + return FormatUtils.getTopPrivateDomain(domain); + } catch (StringIndexOutOfBoundsException outException) { + logger.error("瑙f瀽椤剁骇鍩熷悕寮傚父,寮傚父鍩熷悕:" + domain); + return ""; + } + } + + /** + * 鏍规嵁缂栫爜瑙g爜base64 + * + * @param message base64 + * @param charset 缂栫爜 + * @return 瑙g爜瀛楃涓 + */ + static String decodeBase64(String message, Object charset) { + String result = ""; + try { + if (StringUtil.isNotBlank(message)) { + if (charset == null) { + result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET); + } else { + result = Base64.decodeStr(message, charset.toString()); + } + } + } catch (RuntimeException rune) { + logger.error("瑙f瀽 Base64 寮傚父,寮傚父淇℃伅:" + rune); + } + return result; + } + + /** + * 鏍规嵁琛ㄨ揪寮忚В鏋恓son + * + * @param message json + * @param expr 瑙f瀽琛ㄨ揪寮 + * @return 瑙f瀽缁撴灉 + */ + static String flattenSpec(String message, String expr) { + String flattenResult = ""; + try { + if (StringUtil.isNotBlank(expr)) { + ArrayList read = JsonPath.parse(message).read(expr); + flattenResult = read.get(0); + } + } catch (ClassCastException | InvalidPathException e) { + logger.error("璁惧鏍囩瑙f瀽寮傚父锛孾 " + expr + " ]瑙f瀽琛ㄨ揪寮忛敊璇" + e); + } + return flattenResult; + } + + + /** + * 鍒ゆ柇鏄惁涓烘棩蹇楀瓧娈,鏄垯杩斿洖瀵瑰簲value锛屽惁鍒欒繑鍥炲師濮嬪瓧绗︿覆 + * + * @param object 鍐呭瓨瀹炰綋绫 + * @param param 瀛楁鍚/鏅氬瓧绗︿覆 + * @return JSON.Value or String + */ + static Object isJsonValue(Object object, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(object, param.substring(2)); + } else { + return param; + } + } + + /** + * 鍒ゆ柇鏄惁涓烘棩蹇楀瓧娈,鏄垯杩斿洖瀵瑰簲value锛屽惁鍒欒繑鍥炲師濮嬪瓧绗︿覆 + * + * @param jsonMap 鍐呭瓨瀹炰綋绫 + * @param param 瀛楁鍚/鏅氬瓧绗︿覆 + * @return JSON.Value or String + */ + static Object isJsonValue(Map jsonMap, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(jsonMap, param.substring(2)); + } else { + return param; + } + } + + /** + * IF鍑芥暟瀹炵幇锛岃В鏋愭棩蹇楁瀯寤轰笁鐩繍绠;鍖呭惈鍒ゆ柇鏄惁涓烘暟瀛楄嫢涓烘暟瀛楀垯杞崲涓簂ong绫诲瀷杩斿洖缁撴灉銆 + * + * @param object 鍐呭瓨瀹炰綋绫 + * @param ifParam 瀛楁鍚/鏅氬瓧绗︿覆 + * @return resultA or resultB or null + */ + static Object condition(Object object, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(object, norms[0]); + Object resultA = isJsonValue(object, split[1]); + Object resultB = isJsonValue(object, split[2]); + if (direction instanceof Number) { +// result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; + result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); + } else if (direction instanceof String) { + result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); +// result = direction.equals(norms[1]) ? resultA : resultB; + } + } + } catch (RuntimeException e) { + logger.error("IF 鍑芥暟鎵ц寮傚父,寮傚父淇℃伅:" + e); + } + return result; + } + + /** + * IF鍑芥暟瀹炵幇锛岃В鏋愭棩蹇楁瀯寤轰笁鐩繍绠;鍖呭惈鍒ゆ柇鏄惁涓烘暟瀛楄嫢涓烘暟瀛楀垯杞崲涓簂ong绫诲瀷杩斿洖缁撴灉銆 + * + * @param jsonMap 鍐呭瓨瀹炰綋绫 + * @param ifParam 瀛楁鍚/鏅氬瓧绗︿覆 + * @return resultA or resultB or null + */ + static Object condition(Map jsonMap, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(jsonMap, norms[0]); + Object resultA = isJsonValue(jsonMap, split[1]); + Object resultB = isJsonValue(jsonMap, split[2]); + if (direction instanceof Number) { + result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; +// result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); + } else if (direction instanceof String) { +// result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); + result = direction.equals(norms[1]) ? resultA : resultB; + } + } + } catch (RuntimeException e) { + logger.error("IF 鍑芥暟鎵ц寮傚父,寮傚父淇℃伅:" + e); + } + return result; + } + +// /** +// * IF鍑芥暟瀹炵幇锛岃В鏋愭棩蹇楁瀯寤轰笁鐩繍绠;鍖呭惈鍒ゆ柇鏄惁涓烘暟瀛楄嫢涓烘暟瀛楀垯杞崲涓簂ong绫诲瀷杩斿洖缁撴灉銆 +// * +// * @param jsonMap 鍘熷鏃ュ織 +// * @param ifParam 瀛楁鍚/鏅氬瓧绗︿覆 +// * @return resultA or resultB or null +// */ +// static Object condition(Map jsonMap, String ifParam) { +// try { +// String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); +// String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); +// String direction = isJsonValue(jsonMap, norms[0]); +// if (StringUtil.isNotBlank(direction)) { +// if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { +// String resultA = isJsonValue(jsonMap, split[1]); +// String resultB = isJsonValue(jsonMap, split[2]); +// String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB; +// Matcher isNum = PATTERN.matcher(result); +// if (isNum.matches()) { +// return Long.parseLong(result); +// } else { +// return result; +// } +// } +// } +// } catch (RuntimeException e) { +// logger.error("IF 鍑芥暟鎵ц寮傚父,寮傚父淇℃伅:" + e); +// } +// return null; +// } + + /** + * 璁剧疆鍥哄畾鍊煎嚱鏁 鑻ヤ负鏁板瓧鍒欒浆涓簂ong杩斿洖 + * + * @param param 榛樿鍊 + * @return 杩斿洖鏁板瓧鎴栧瓧绗︿覆 + */ + static Object setValue(String param) { + try { + Matcher isNum = PATTERN.matcher(param); + if (isNum.matches()) { + return Long.parseLong(param); + } else { + return param; + } + } catch (RuntimeException e) { + logger.error("SetValue 鍑芥暟寮傚父锛屽紓甯镐俊鎭細" + e); + } + return null; + } +} diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java new file mode 100644 index 0000000..60b3d09 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -0,0 +1,202 @@ +package com.zdjizhi.utils.hbase; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.DefaultProConfig; +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * HBase 宸ュ叿绫 + * + * @author qidaijie + */ + +public class HBaseUtils { + private static final Log logger = LogFactory.get(); + private static Map subIdMap = new ConcurrentHashMap<>(83334); + private static Connection connection; + private static Long time; + + private static String zookeeperIp; + private static String hBaseTable; + + private static HBaseUtils hBaseUtils; + + private static void getInstance() { + hBaseUtils = new HBaseUtils(); + } + + + /** + * 鏋勯犲嚱鏁-鏂 + */ + private HBaseUtils() { + zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS; + hBaseTable = DefaultProConfig.HBASE_TABLE_NAME; + //鑾峰彇杩炴帴 + getConnection(); + //鎷夊彇鎵鏈 + getAll(); + //瀹氭椂鏇存柊 + updateCache(); + } + + private static void getConnection() { + try { + // 绠$悊Hbase鐨勯厤缃俊鎭 + Configuration configuration = HBaseConfiguration.create(); + // 璁剧疆zookeeper鑺傜偣 + configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + connection = ConnectionFactory.createConnection(configuration); + time = System.currentTimeMillis(); + logger.warn("HBaseUtils get HBase connection,now to getAll()."); + } catch (IOException ioe) { + logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); + } + } + + /** + * 鏇存柊鍙橀噺 + */ + private static void change() { + if (hBaseUtils == null) { + getInstance(); + } + long nowTime = System.currentTimeMillis(); + timestampsFilter(time - 1000, nowTime + 500); + } + + + /** + * 鑾峰彇鍙樻洿鍐呭 + * + * @param startTime 寮濮嬫椂闂 + * @param endTime 缁撴潫鏃堕棿 + */ + private static void timestampsFilter(Long startTime, Long endTime) { + Long begin = System.currentTimeMillis(); + Table table = null; + ResultScanner scanner = null; + Scan scan2 = new Scan(); + try { + table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + scan2.setTimeRange(startTime, endTime); + scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + String key = Bytes.toString(CellUtil.cloneRow(cell)).trim(); + String value = Bytes.toString(CellUtil.cloneValue(cell)).trim(); + if (subIdMap.containsKey(key)) { + if (!value.equals(subIdMap.get(key))) { + subIdMap.put(key, value); + } + } else { + subIdMap.put(key, value); + } + } + } + Long end = System.currentTimeMillis(); + logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size()); + logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime); + time = endTime; + } catch (IOException ioe) { + logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<==="); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + logger.error("HBase Table Close ERROR! Exception message is:" + e); + } + } + } + } + + /** + * 鑾峰彇鎵鏈夌殑 key value + */ + private static void getAll() { + long begin = System.currentTimeMillis(); + try { + Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin)); + scanner.close(); + } catch (IOException ioe) { + logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); + } + } + + /** + * 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie + */ + private void updateCache() { +// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, +// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build()); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } + } catch (RuntimeException e) { + logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + + /** + * 鑾峰彇 account + * + * @param clientIp client_ip + * @return account + */ + public static String getAccount(String clientIp) { + + if (hBaseUtils == null) { + getInstance(); + } + return subIdMap.get(clientIp); + + } + +} diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..1adb1d1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java @@ -0,0 +1,77 @@ +package com.zdjizhi.utils.http; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 鑾峰彇缃戝叧schema鐨勫伐鍏风被 + * + * @author qidaijie + */ +public class HttpClientUtil { + private static final Log logger = LogFactory.get(); + + /** + * 璇锋眰缃戝叧鑾峰彇schema + * + * @param http 缃戝叧url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + CloseableHttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(get); + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; + } + entityStringBuilder.append(c); + } + + return entityStringBuilder.toString(); + } + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); + } finally { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); + } + } + if (bufferedReader != null) { + IOUtils.closeQuietly(bufferedReader); + } + } + return ""; + } +} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..bdcc43d --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -0,0 +1,283 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.http.HttpClientUtil; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 浣跨敤FastJson瑙f瀽json鐨勫伐鍏风被 + * + * @author qidaijie + */ +public class JsonParseUtil { + + private static final Log logger = LogFactory.get(); + + private static ArrayList dropList = new ArrayList<>(); + + /** + * 妯″紡鍖归厤锛岀粰瀹氫竴涓被鍨嬪瓧绗︿覆杩斿洖涓涓被绫诲瀷 + * + * @param type 绫诲瀷 + * @return 绫荤被鍨 + */ + + private static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "string": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "array": + clazz = List.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + /** + * 鑾峰彇灞炴у肩殑鏂规硶 + * + * @param obj 瀵硅薄 + * @param property key + * @return 灞炴х殑鍊 + */ + public static Object getValue(Object obj, String property) { + try { + BeanMap beanMap = BeanMap.create(obj); + return beanMap.get(property); + } catch (RuntimeException e) { + logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e); + return null; + } + } + + /** + * 鑾峰彇灞炴у肩殑鏂规硶 + * + * @param jsonMap 鍘熷鏃ュ織 + * @param property key + * @return 灞炴х殑鍊 + */ + public static Object getValue(Map jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e); + return null; + } + } + + /** + * 鏇存柊灞炴у肩殑鏂规硶 + * + * @param jsonMap 鍘熷鏃ュ織json map + * @param property 鏇存柊鐨刱ey + * @param value 鏇存柊鐨勫 + */ + public static void setValue(Map jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e); + } + } + + /** + * 鏇存柊灞炴у肩殑鏂规硶 + * + * @param obj 瀵硅薄 + * @param property 鏇存柊鐨刱ey + * @param value 鏇存柊鐨勫 + */ + public static void setValue(Object obj, String property, Object value) { + try { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } catch (ClassCastException e) { + logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e); + } + } + + /** + * 鏍规嵁鍙嶅皠鐢熸垚瀵硅薄鐨勬柟娉 + * + * @param properties 鍙嶅皠绫荤敤鐨刴ap + * @return 鐢熸垚鐨凮bject绫诲瀷鐨勫璞 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞 + * + * @param http 缃戝叧schema鍦板潃 + * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎 + */ + public static HashMap getMapFromHttp(String http) { + HashMap map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(http); + Object data = JSON.parseObject(schema).get("data"); + + //鑾峰彇fields锛屽苟杞寲涓烘暟缁勶紝鏁扮粍鐨勬瘡涓厓绱犻兘鏄竴涓猲ame doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + String filedStr = field.toString(); + if (checkKeepField(filedStr)) { + String name = JsonPath.read(filedStr, "$.name").toString(); + String type = JsonPath.read(filedStr, "$.type").toString(); + if (type.contains("{")) { + type = JsonPath.read(filedStr, "$.type.type").toString(); + } + //缁勫悎鐢ㄦ潵鐢熸垚瀹炰綋绫荤殑map + map.put(name, getClassName(type)); + } else { + dropList.add(filedStr); + } + } + return map; + } + + /** + * 鍒ゆ柇瀛楁鏄惁闇瑕佷繚鐣 + * + * @param message 鍗曚釜field-json + * @return true or false + */ + private static boolean checkKeepField(String message) { + boolean isKeepField = true; + boolean isHiveDoc = JSON.parseObject(message).containsKey("doc"); + if (isHiveDoc) { + boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); + if (isHiveVi) { + String visibility = JsonPath.read(message, "$.doc.visibility").toString(); + if (FlowWriteConfig.VISIBILITY.equals(visibility)) { + isKeepField = false; + } + } + } + return isKeepField; + } + + static void dropJsonField(Map jsonMap) { + for (String field : dropList) { + jsonMap.remove(field); + } + } + + /** + * 鏍规嵁http閾炬帴鑾峰彇schema锛岃В鏋愪箣鍚庤繑鍥炰竴涓换鍔″垪琛 (useList toList funcList paramlist) + * + * @param http 缃戝叧url + * @return 浠诲姟鍒楄〃 + */ + public static ArrayList getJobListFromHttp(String http) { + ArrayList list = new ArrayList<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + //瑙f瀽data + Object data = JSON.parseObject(schema).get("data"); + + //鑾峰彇fields锛屽苟杞寲涓烘暟缁勶紝鏁扮粍鐨勬瘡涓厓绱犻兘鏄竴涓猲ame doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + + if (JSON.parseObject(field.toString()).containsKey("doc")) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + + if (JSON.parseObject(doc.toString()).containsKey("format")) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + Object format = JSON.parseObject(doc.toString()).get("format"); + JSONObject formatObject = JSON.parseObject(format.toString()); + + String functions = formatObject.get("functions").toString(); + String appendTo = null; + String params = null; + + if (formatObject.containsKey("appendTo")) { + appendTo = formatObject.get("appendTo").toString(); + } + + if (formatObject.containsKey("param")) { + params = formatObject.get("param").toString(); + } + + + if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], null}); + } + + } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); + + } + } else { + list.add(new String[]{name, name, functions, params}); + } + + } + } + + } + return list; + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java new file mode 100644 index 0000000..0b6bc1e --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java @@ -0,0 +1,187 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.FlowWriteException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +public class JsonTypeUtils { + private static final Log logger = LogFactory.get(); + /** + * 鍦ㄥ唴瀛樹腑鍔犺浇鍙嶅皠绫荤敤鐨刴ap + */ + private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 绫诲瀷杞崲 + * + * @param jsonMap 鍘熷鏃ュ織map + */ + public static Map typeTransform(Map jsonMap) throws RuntimeException { + JsonParseUtil.dropJsonField(jsonMap); + HashMap tmpMap = new HashMap<>(192); + for (String key : jsonMap.keySet()) { + if (map.containsKey(key)) { + String simpleName = map.get(key).getSimpleName(); + switch (simpleName) { + case "String": + tmpMap.put(key, checkString(jsonMap.get(key))); + break; + case "Integer": + tmpMap.put(key, getIntValue(jsonMap.get(key))); + break; + case "long": + tmpMap.put(key, checkLongValue(jsonMap.get(key))); + break; + case "List": + tmpMap.put(key, checkArray(jsonMap.get(key))); + break; + case "Map": + tmpMap.put(key, checkObject(jsonMap.get(key))); + break; + case "double": + tmpMap.put(key, checkDouble(jsonMap.get(key))); + break; + default: + tmpMap.put(key, checkString(jsonMap.get(key))); + } + } + } + return tmpMap; + } + + /** + * String 绫诲瀷妫楠岃浆鎹㈡柟娉 + * + * @param value json value + * @return String value + */ + private static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map){ + return JsonMapper.toJsonString(value); + } + + if (value instanceof List){ + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 绫诲瀷妫楠岃浆鎹㈡柟娉 + * + * @param value json value + * @return List value + */ + private static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new FlowWriteException("can not cast to map, value : " + value); + } + + /** + * array 绫诲瀷妫楠岃浆鎹㈡柟娉 + * + * @param value json value + * @return List value + */ + private static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new FlowWriteException("can not cast to List, value : " + value); + } + + private static Long checkLong(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToLong(value); + } + + /** + * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊 + * + * @param value json value + * @return Long value + */ + private static long checkLongValue(Object value) { + Long longVal = TypeUtils.castToLong(value); + if (longVal == null) { + return 0L; + } + +// return longVal.longValue(); + return longVal; + } + + /** + * Double 绫诲瀷鏍¢獙杞崲鏂规硶 + * + * @param value json value + * @return Double value + */ + private static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + private static Integer checkInt(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToInt(value); + } + + + /** + * int 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊 + * + * @param value json value + * @return int value + */ + private static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + if (intVal == null) { + return 0; + } + +// return intVal.intValue(); + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java new file mode 100644 index 0000000..b13627f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -0,0 +1,171 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.FlowWriteException; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1218:20 + */ +public class TypeUtils { + private static final Log logger = LogFactory.get(); + + /** + * Integer 绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return Integer value or null + */ + public static Object castToIfFunction(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value.toString(); + } + + if (value instanceof Integer) { + return ((Number) value).intValue(); + } + + if (value instanceof Long) { + return ((Number) value).longValue(); + } + +// if (value instanceof Map) { +// return (Map) value; +// } +// +// if (value instanceof List) { +// return Collections.singletonList(value.toString()); +// } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Integer 绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return Integer value or null + */ + static Integer castToInt(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Integer) { + return (Integer) value; + } + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + if (StringUtil.isBlank(strVal)) { + return null; + } + + //灏 10,20 绫绘暟鎹浆鎹负10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Integer Error,The error Str is:" + strVal); + } + } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Double绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return double value or null + */ + static Double castToDouble(Object value) { + + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //灏 10,20 绫绘暟鎹浆鎹负10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Double.parseDouble(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Double Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to double, value : " + value); + } + + /** + * Long绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return (Long)value or null + */ + static Long castToLong(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //灏 10,20 绫绘暟鎹浆鎹负10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Long.parseLong(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Long Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to long, value : " + value); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..c220064 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,44 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; + + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", FlowWriteConfig.GROUP_ID); + properties.put("session.timeout.ms", "60000"); + properties.put("max.poll.records", "3000"); + properties.put("max.partition.fetch.bytes", "31457280"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + /* + * kafka闄愭祦閰嶇疆-20201117 + */ +// properties.put(ConsumerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.CONSUMER_CLIENT_ID); + return properties; + } + + public static FlinkKafkaConsumer getKafkaConsumer() { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(false); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java new file mode 100644 index 0000000..077ae71 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -0,0 +1,53 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.DefaultProConfig; +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class Producer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS); +// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); +// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", FlowWriteConfig.PRODUCER_ACK); + properties.put("retries", DefaultProConfig.RETRIES); + properties.put("linger.ms", DefaultProConfig.LINGER_MS); + properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", DefaultProConfig.BATCH_SIZE); + properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); + properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); + + /** + * kafka闄愭祦閰嶇疆-20201117 + */ +// properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID); +// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + return properties; + } + + + public static FlinkKafkaProducer getKafkaProducer() { + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( + FlowWriteConfig.OUTPUT_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig()); + + kafkaProducer.setLogFailuresOnly(false); +// kafkaProducer.setWriteTimestampToKafka(true); + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java new file mode 100644 index 0000000..08fa29b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -0,0 +1,70 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class FlowWriteConfigurations { + + private static Properties propKafka = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propKafka.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propKafka = null; + propService = null; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java new file mode 100644 index 0000000..2afab03 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java @@ -0,0 +1,190 @@ +package com.zdjizhi.utils.zookeeper; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * @author qidaijie + */ +public class DistributedLock implements Lock, Watcher { + private static final Log logger = LogFactory.get(); + + private ZooKeeper zk = null; + /** + * 鏍硅妭鐐 + */ + private final String ROOT_LOCK = "/locks"; + /** + * 绔炰簤鐨勮祫婧 + */ + private String lockName; + /** + * 绛夊緟鐨勫墠涓涓攣 + */ + private String waitLock; + /** + * 褰撳墠閿 + */ + private String currentLock; + /** + * 璁℃暟鍣 + */ + private CountDownLatch countDownLatch; + + private int sessionTimeout = 2000; + + private List exceptionList = new ArrayList(); + + /** + * 閰嶇疆鍒嗗竷寮忛攣 + * + * @param config 杩炴帴鐨剈rl + * @param lockName 绔炰簤璧勬簮 + */ + public DistributedLock(String config, String lockName) { + this.lockName = lockName; + try { + // 杩炴帴zookeeper + zk = new ZooKeeper(config, sessionTimeout, this); + Stat stat = zk.exists(ROOT_LOCK, false); + if (stat == null) { + // 濡傛灉鏍硅妭鐐逛笉瀛樺湪锛屽垯鍒涘缓鏍硅妭鐐 + zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException | InterruptedException | KeeperException e) { + logger.error("Node already exists!"); + } + } + + // 鑺傜偣鐩戣鍣 + @Override + public void process(WatchedEvent event) { + if (this.countDownLatch != null) { + this.countDownLatch.countDown(); + } + } + + @Override + public void lock() { + if (exceptionList.size() > 0) { + throw new LockException(exceptionList.get(0)); + } + try { + if (this.tryLock()) { + logger.info(Thread.currentThread().getName() + " " + lockName + "鑾峰緱浜嗛攣"); + } else { + // 绛夊緟閿 + waitForLock(waitLock, sessionTimeout); + } + } catch (InterruptedException | KeeperException e) { + logger.error("鑾峰彇閿佸紓甯" + e); + } + } + + @Override + public boolean tryLock() { + try { + String splitStr = "_lock_"; + if (lockName.contains(splitStr)) { + throw new LockException("閿佸悕鏈夎"); + } + // 鍒涘缓涓存椂鏈夊簭鑺傜偣 + currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + // 鍙栨墍鏈夊瓙鑺傜偣 + List subNodes = zk.getChildren(ROOT_LOCK, false); + // 鍙栧嚭鎵鏈塴ockName鐨勯攣 + List lockObjects = new ArrayList(); + for (String node : subNodes) { + String tmpNode = node.split(splitStr)[0]; + if (tmpNode.equals(lockName)) { + lockObjects.add(node); + } + } + Collections.sort(lockObjects); + // 鑻ュ綋鍓嶈妭鐐逛负鏈灏忚妭鐐癸紝鍒欒幏鍙栭攣鎴愬姛 + if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { + return true; + } + // 鑻ヤ笉鏄渶灏忚妭鐐癸紝鍒欐壘鍒拌嚜宸辩殑鍓嶄竴涓妭鐐 + String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1); + waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); + } catch (InterruptedException | KeeperException e) { + logger.error("鑾峰彇閿佽繃绋嬪紓甯" + e); + } + return false; + } + + + @Override + public boolean tryLock(long timeout, TimeUnit unit) { + try { + if (this.tryLock()) { + return true; + } + return waitForLock(waitLock, timeout); + } catch (KeeperException | InterruptedException | RuntimeException e) { + logger.error("鍒ゆ柇鏄惁閿佸畾寮傚父" + e); + } + return false; + } + + // 绛夊緟閿 + private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { + Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); + + if (stat != null) { + this.countDownLatch = new CountDownLatch(1); + // 璁℃暟绛夊緟锛岃嫢绛夊埌鍓嶄竴涓妭鐐规秷澶憋紝鍒檖recess涓繘琛宑ountDown锛屽仠姝㈢瓑寰咃紝鑾峰彇閿 + this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); + this.countDownLatch = null; + } + return true; + } + + @Override + public void unlock() { + try { + zk.delete(currentLock, -1); + currentLock = null; + zk.close(); + } catch (InterruptedException | KeeperException e) { + logger.error("鍏抽棴閿佸紓甯" + e); + } + } + + @Override + public Condition newCondition() { + return null; + } + + @Override + public void lockInterruptibly() throws InterruptedException { + this.lock(); + } + + + public class LockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockException(String e) { + super(e); + } + + public LockException(Exception e) { + super(e); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java new file mode 100644 index 0000000..ebf4368 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java @@ -0,0 +1,139 @@ +package com.zdjizhi.utils.zookeeper; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * @author qidaijie + * @Package cn.ac.iie.utils.zookeeper + * @Description: + * @date 2020/11/1411:28 + */ +public class ZookeeperUtils implements Watcher { + private static final Log logger = LogFactory.get(); + + private ZooKeeper zookeeper; + + private static final int SESSION_TIME_OUT = 20000; + + private CountDownLatch countDownLatch = new CountDownLatch(1); + + @Override + public void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.SyncConnected) { + countDownLatch.countDown(); + } + } + + + /** + * 淇敼鑺傜偣淇℃伅 + * + * @param path 鑺傜偣璺緞 + */ + public int modifyNode(String path, String zookeeperIp) { + createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); + int workerId = 0; + try { + connectZookeeper(zookeeperIp); + Stat stat = zookeeper.exists(path, true); + workerId = Integer.parseInt(getNodeDate(path)); + if (workerId > 63) { + workerId = 0; + zookeeper.setData(path, "1".getBytes(), stat.getVersion()); + } else { + String result = String.valueOf(workerId + 1); + if (stat != null) { + zookeeper.setData(path, result.getBytes(), stat.getVersion()); + } else { + logger.error("Node does not exist!,Can't modify"); + } + } + } catch (KeeperException | InterruptedException e) { + logger.error("modify error Can't modify," + e); + } finally { + closeConn(); + } + logger.warn("workerID is锛" + workerId); + return workerId; + } + + /** + * 杩炴帴zookeeper + * + * @param host 鍦板潃 + */ + public void connectZookeeper(String host) { + try { + zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); + countDownLatch.await(); + } catch (IOException | InterruptedException e) { + logger.error("Connection to the Zookeeper Exception! message:" + e); + } + } + + /** + * 鍏抽棴杩炴帴 + */ + public void closeConn() { + try { + if (zookeeper != null) { + zookeeper.close(); + } + } catch (InterruptedException e) { + logger.error("Close the Zookeeper connection Exception! message:" + e); + } + } + + /** + * 鑾峰彇鑺傜偣鍐呭 + * + * @param path 鑺傜偣璺緞 + * @return 鍐呭/寮傚父null + */ + public String getNodeDate(String path) { + String result = null; + Stat stat = new Stat(); + try { + byte[] resByte = zookeeper.getData(path, true, stat); + + result = StrUtil.str(resByte, "UTF-8"); + } catch (KeeperException | InterruptedException e) { + logger.error("Get node information exception" + e); + } + return result; + } + + /** + * @param path 鑺傜偣鍒涘缓鐨勮矾寰 + * @param date 鑺傜偣鎵瀛樺偍鐨勬暟鎹殑byte[] + * @param acls 鎺у埗鏉冮檺绛栫暐 + */ + public void createNode(String path, byte[] date, List acls, String zookeeperIp) { + try { + connectZookeeper(zookeeperIp); + Stat exists = zookeeper.exists(path, true); + if (exists == null) { + Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); + if (existsSnowflakeld == null) { + zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); + } + zookeeper.create(path, date, acls, CreateMode.PERSISTENT); + } else { + logger.warn("Node already exists ! Don't need to create"); + } + } catch (KeeperException | InterruptedException e) { + logger.error(e); + } finally { + closeConn(); + } + } +} diff --git a/src/main/log4j.properties b/src/main/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 鎺у埗鍙版棩蹇楄缃 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 鏂囦欢鏃ュ織璁剧疆 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#璺緞璇风敤鐩稿璺緞锛屽仛濂界浉鍏虫祴璇曡緭鍑哄埌搴旂敤鐩笅 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 閰嶇疆锛宑om.nis.web.dao鏄痬ybatis鎺ュ彛鎵鍦ㄥ寘 +log4j.logger.com.nis.web.dao=debug +#bonecp鏁版嵁婧愰厤缃 +log4j.category.com.jolbox=debug,console + + diff --git a/src/main/logback.xml b/src/main/logback.xml new file mode 100644 index 0000000..a508b6b --- /dev/null +++ b/src/main/logback.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + ${LOG_PATTERN} + + + + + + + ${LOG_FILE_PATH} + + 30 + + + 20MB + + + + + ${LOG_PATTERN} + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/KafkaLogSend.java b/src/test/java/com/zdjizhi/KafkaLogSend.java new file mode 100644 index 0000000..5c3feb3 --- /dev/null +++ b/src/test/java/com/zdjizhi/KafkaLogSend.java @@ -0,0 +1,92 @@ +package com.zdjizhi; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.DefaultProConfig; +import org.apache.kafka.clients.producer.*; + +import java.util.Properties; + +/** + * NTC绯荤粺閰嶇疆浜х敓鏃ュ織鍐欏叆鏁版嵁涓績绫 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ + +public class KafkaLogSend { + private static final Log logger = LogFactory.get(); + + /** + * kafka鐢熶骇鑰咃紝鐢ㄤ簬鍚慿afka涓彂閫佹秷鎭 + */ + private static org.apache.kafka.clients.producer.Producer kafkaProducer; + + /** + * kafka鐢熶骇鑰呴傞厤鍣紙鍗曚緥锛夛紝鐢ㄦ潵浠g悊kafka鐢熶骇鑰呭彂閫佹秷鎭 + */ + private static KafkaLogSend kafkaLogSend; + + private KafkaLogSend() { + initKafkaProducer(); + } + + public static KafkaLogSend getInstance() { + if (kafkaLogSend == null) { + kafkaLogSend = new KafkaLogSend(); + } + return kafkaLogSend; + } + + + public void sendMessage(String message) { +// for (String value : list) { + kafkaProducer.send(new ProducerRecord<>("test", message), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("鍐欏叆test鍑虹幇寮傚父", exception); + } + } + }); +// } +// kafkaProducer.flush(); + logger.debug("Log sent to National Center successfully!!!!!"); + } + + /** + * 鏍规嵁kafka鐢熶骇鑰呴厤缃俊鎭垵濮嬪寲kafka娑堟伅鐢熶骇鑰,鍙垵濮嬪寲涓娆 + */ + private void initKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093"); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("retries", DefaultProConfig.RETRIES); + properties.put("linger.ms", DefaultProConfig.LINGER_MS); + properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", DefaultProConfig.BATCH_SIZE); + properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); + properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); + + properties.put("security.protocol", "SSL"); + properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks"); + properties.put("ssl.keystore.password", "ceiec2019"); + properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks"); + properties.put("ssl.truststore.password", "ceiec2019"); + properties.put("ssl.key.password", "ceiec2019"); + + + /* + * kafka闄愭祦閰嶇疆-20201117 + */ +// properties.put(ProducerConfig.CLIENT_ID_CONFIG, VoipRelationConfig.PRODUCER_CLIENT_ID); +// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + + kafkaProducer = new KafkaProducer<>(properties); + } + + +} diff --git a/src/test/java/com/zdjizhi/KafkaTest.java b/src/test/java/com/zdjizhi/KafkaTest.java new file mode 100644 index 0000000..3bb6d1c --- /dev/null +++ b/src/test/java/com/zdjizhi/KafkaTest.java @@ -0,0 +1,53 @@ +package com.zdjizhi; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.kafka.clients.producer.*; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/8/217:39 + */ +public class KafkaTest { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093"); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); +// properties.put("retries", DefaultProConfig.RETRIES); +// properties.put("linger.ms", DefaultProConfig.LINGER_MS); +// properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); +// properties.put("batch.size", DefaultProConfig.BATCH_SIZE); +// properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); +// properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); + + properties.put("security.protocol", "SSL"); +// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks"); + properties.put("ssl.keystore.location", "/usr/ca/client/client.keystore.jks"); + properties.put("ssl.keystore.password", "ceiec2019"); +// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks"); + properties.put("ssl.truststore.location", "/usr/ca/trust/client.truststore.jks"); + properties.put("ssl.truststore.password", "ceiec2019"); + properties.put("ssl.key.password", "ceiec2019"); + + Producer producer = new KafkaProducer(properties); + + producer.send(new ProducerRecord<>("test", "hello!"), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("鍐欏叆test鍑虹幇寮傚父", exception); + } + } + }); + + producer.close(); + } +} diff --git a/src/test/java/com/zdjizhi/LocationTest.java b/src/test/java/com/zdjizhi/LocationTest.java new file mode 100644 index 0000000..e7b2d15 --- /dev/null +++ b/src/test/java/com/zdjizhi/LocationTest.java @@ -0,0 +1,28 @@ +package com.zdjizhi; + +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.IpLookup; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/8/1811:34 + */ +public class LocationTest { + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v4.mmdb") + .loadDataFileV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v6.mmdb") + .loadDataFilePrivateV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v4.mmdb") + .loadDataFilePrivateV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v6.mmdb") + .build(); + + @Test + public void IpLocationTest() { + System.out.println(ipLookup.cityLookupDetail("24.241.112.0")); + System.out.println(ipLookup.cityLookupDetail("1.1.1.1")); + System.out.println(ipLookup.cityLookupDetail("192.168.50.58")); + System.out.println(ipLookup.cityLookupDetail("2600:1700:9010::")); + } +}