From 21b150c2bb776a371202ae152bd930436c7c4910 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Tue, 16 Mar 2021 14:48:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DEAL4=E4=B8=AD=E9=AB=98?= =?UTF-8?q?=E7=BA=A7=E8=AD=A6=E5=91=8A=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- properties/service_flow_config.properties | 3 -- .../zdjizhi}/bolt/CompletionBolt.java | 31 +++-------- .../zdjizhi}/bolt/kafka/LogSendBolt.java | 13 ++--- .../zdjizhi}/common/FlowWriteConfig.java | 6 +-- .../zdjizhi}/common/KafkaProConfig.java | 4 +- .../zdjizhi}/spout/CustomizedKafkaSpout.java | 9 ++-- .../topology/LogFlowWriteTopology.java | 18 +++---- .../zdjizhi}/topology/StormRunner.java | 2 +- .../zdjizhi}/utils/general/SnowflakeId.java | 35 +++---------- .../utils/general/TransFormUtils.java | 36 ++++++------- .../zdjizhi}/utils/general/TransFunction.java | 19 ++++--- .../zdjizhi}/utils/hbase/HBaseUtils.java | 52 +++++++++++-------- .../zdjizhi}/utils/http/HttpClientUtil.java | 9 ++-- .../zdjizhi}/utils/json/JsonParseUtil.java | 12 ++--- .../zdjizhi}/utils/kafka/KafkaLogSend.java | 11 ++-- .../utils/system/FlowWriteConfigurations.java | 2 +- .../zdjizhi}/utils/system/TupleUtils.java | 2 +- .../utils/zookeeper/DistributedLock.java | 8 +-- .../utils/zookeeper/ZookeeperUtils.java | 9 ++-- src/test/java/cn/ac/iie/test/DomainTest.java | 23 -------- .../java/cn/ac/iie/test/FunctionIfTest.java | 32 ------------ 22 files changed, 126 insertions(+), 212 deletions(-) rename src/main/java/{cn/ac/iie => com/zdjizhi}/bolt/CompletionBolt.java (55%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/bolt/kafka/LogSendBolt.java (87%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/common/FlowWriteConfig.java (95%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/common/KafkaProConfig.java (89%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/spout/CustomizedKafkaSpout.java (94%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/topology/LogFlowWriteTopology.java (88%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/topology/StormRunner.java (97%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/general/SnowflakeId.java (80%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/general/TransFormUtils.java (87%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/general/TransFunction.java (92%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/hbase/HBaseUtils.java (81%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/http/HttpClientUtil.java (90%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/json/JsonParseUtil.java (96%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/kafka/KafkaLogSend.java (92%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/system/FlowWriteConfigurations.java (98%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/system/TupleUtils.java (94%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/zookeeper/DistributedLock.java (95%) rename src/main/java/{cn/ac/iie => com/zdjizhi}/utils/zookeeper/ZookeeperUtils.java (95%) delete mode 100644 src/test/java/cn/ac/iie/test/DomainTest.java delete mode 100644 src/test/java/cn/ac/iie/test/FunctionIfTest.java diff --git a/pom.xml b/pom.xml index 008a5e3..d820ed7 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ - cn.ac.iie.topology.LogFlowWriteTopology + com.zdjizhi.topology.LogFlowWriteTopology diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 3f49225..181435f 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -91,9 +91,6 @@ max.failure.num=20 #邮件默认编码 mail.default.charset=UTF-8 -#需不需要补subscriber_id,需要则为yes,不需要为no -need.complete.subid=yes - #需不要补全,不需要则原样日志输出 log.need.complete=yes diff --git a/src/main/java/cn/ac/iie/bolt/CompletionBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java similarity index 55% rename from src/main/java/cn/ac/iie/bolt/CompletionBolt.java rename to src/main/java/com/zdjizhi/bolt/CompletionBolt.java index 0053291..309cf56 100644 --- a/src/main/java/cn/ac/iie/bolt/CompletionBolt.java +++ b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java @@ -1,12 +1,9 @@ -package cn.ac.iie.bolt; +package com.zdjizhi.bolt; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.general.SnowflakeId; -import cn.ac.iie.utils.hbase.HBaseUtils; -import cn.ac.iie.utils.system.TupleUtils; -import com.zdjizhi.utils.FormatUtils; +import com.zdjizhi.common.FlowWriteConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -15,19 +12,17 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import java.util.HashMap; import java.util.Map; -import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage; +import static com.zdjizhi.utils.general.TransFormUtils.dealCommonMessage; /** * @author qidaijie */ public class CompletionBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(CompletionBolt.class); private static final long serialVersionUID = 9006119186526123734L; - private static final String IS = "yes"; + private static final Log logger = LogFactory.get(); @Override @@ -39,30 +34,16 @@ public class CompletionBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { -// if (TupleUtils.isTick(tuple)) { -// if (IS.equals(FlowWriteConfig.NEED_COMPLETE_SUBID)) { -// HBaseUtils.change(); -// } -// } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { basicOutputCollector.emit(new Values(dealCommonMessage(message))); } -// } } catch (Exception e) { logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); e.printStackTrace(); } } -// @Override -// public Map getComponentConfiguration() { -// Map conf = new HashMap(16); -// conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, -// FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); -// return conf; -// } - @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { diff --git a/src/main/java/cn/ac/iie/bolt/kafka/LogSendBolt.java b/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java similarity index 87% rename from src/main/java/cn/ac/iie/bolt/kafka/LogSendBolt.java rename to src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java index 35bda1a..9cf14b8 100644 --- a/src/main/java/cn/ac/iie/bolt/kafka/LogSendBolt.java +++ b/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java @@ -1,10 +1,11 @@ -package cn.ac.iie.bolt.kafka; +package com.zdjizhi.bolt.kafka; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.kafka.KafkaLogSend; -import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.kafka.KafkaLogSend; +import com.zdjizhi.utils.system.TupleUtils; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -22,7 +23,7 @@ import java.util.Map; */ public class LogSendBolt extends BaseBasicBolt { private static final long serialVersionUID = -3663610927224396615L; - private static Logger logger = Logger.getLogger(LogSendBolt.class); + private static final Log logger = LogFactory.get(); private List list; private KafkaLogSend kafkaLogSend; diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java similarity index 95% rename from src/main/java/cn/ac/iie/common/FlowWriteConfig.java rename to src/main/java/com/zdjizhi/common/FlowWriteConfig.java index cd7d0c2..0502aef 100644 --- a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -1,7 +1,7 @@ -package cn.ac.iie.common; +package com.zdjizhi.common; -import cn.ac.iie.utils.system.FlowWriteConfigurations; +import com.zdjizhi.utils.system.FlowWriteConfigurations; /** * @author Administrator @@ -13,6 +13,7 @@ public class FlowWriteConfig { public static final String FORMAT_SPLITTER = ","; public static final String IS_JSON_KEY_TAG = "$."; public static final String IF_CONDITION_SPLITTER = "="; + public static final String MODEL = "remote"; /** * System */ @@ -29,7 +30,6 @@ public class FlowWriteConfig { public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); - public static final String NEED_COMPLETE_SUBID = FlowWriteConfigurations.getStringProperty(0, "need.complete.subid"); public static final String LOG_NEED_COMPLETE = FlowWriteConfigurations.getStringProperty(0, "log.need.complete"); /** diff --git a/src/main/java/cn/ac/iie/common/KafkaProConfig.java b/src/main/java/com/zdjizhi/common/KafkaProConfig.java similarity index 89% rename from src/main/java/cn/ac/iie/common/KafkaProConfig.java rename to src/main/java/com/zdjizhi/common/KafkaProConfig.java index eb14465..6e7d616 100644 --- a/src/main/java/cn/ac/iie/common/KafkaProConfig.java +++ b/src/main/java/com/zdjizhi/common/KafkaProConfig.java @@ -1,7 +1,7 @@ -package cn.ac.iie.common; +package com.zdjizhi.common; -import cn.ac.iie.utils.system.FlowWriteConfigurations; +import com.zdjizhi.utils.system.FlowWriteConfigurations; /** * @author Administrator diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java similarity index 94% rename from src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java rename to src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java index d8a9946..d8fde9a 100644 --- a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java +++ b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java @@ -1,11 +1,12 @@ -package cn.ac.iie.spout; +package com.zdjizhi.spout; -import cn.ac.iie.common.FlowWriteConfig; +import com.zdjizhi.common.FlowWriteConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.log4j.Logger; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -27,7 +28,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout { private KafkaConsumer consumer; private SpoutOutputCollector collector = null; private TopologyContext context = null; - private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + private static final Log logger = LogFactory.get(); private static Properties createConsumerConfig() { diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java similarity index 88% rename from src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java rename to src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 7b2d68c..f8146a2 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -1,11 +1,12 @@ -package cn.ac.iie.topology; +package com.zdjizhi.topology; -import cn.ac.iie.bolt.CompletionBolt; -import cn.ac.iie.bolt.kafka.LogSendBolt; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.spout.CustomizedKafkaSpout; -import org.apache.log4j.Logger; +import com.zdjizhi.bolt.CompletionBolt; +import com.zdjizhi.bolt.kafka.LogSendBolt; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.spout.CustomizedKafkaSpout; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; @@ -19,7 +20,7 @@ import org.apache.storm.topology.TopologyBuilder; */ public class LogFlowWriteTopology { - private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + private static final Log logger = LogFactory.get(); private final String topologyName; private final Config topologyConfig; private TopologyBuilder builder; @@ -72,9 +73,8 @@ public class LogFlowWriteTopology { public static void main(String[] args) throws Exception { LogFlowWriteTopology flowWriteTopology; boolean runLocally = true; - String parameter = "remote"; int size = 2; - if (args.length >= size && parameter.equalsIgnoreCase(args[1])) { + if (args.length >= size && FlowWriteConfig.MODEL.equalsIgnoreCase(args[1])) { runLocally = false; flowWriteTopology = new LogFlowWriteTopology(args[0]); } else { diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/com/zdjizhi/topology/StormRunner.java similarity index 97% rename from src/main/java/cn/ac/iie/topology/StormRunner.java rename to src/main/java/com/zdjizhi/topology/StormRunner.java index f5094a4..472cde7 100644 --- a/src/main/java/cn/ac/iie/topology/StormRunner.java +++ b/src/main/java/com/zdjizhi/topology/StormRunner.java @@ -1,4 +1,4 @@ -package cn.ac.iie.topology; +package com.zdjizhi.topology; import org.apache.storm.Config; diff --git a/src/main/java/cn/ac/iie/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java similarity index 80% rename from src/main/java/cn/ac/iie/utils/general/SnowflakeId.java rename to src/main/java/com/zdjizhi/utils/general/SnowflakeId.java index 0b80158..0d50226 100644 --- a/src/main/java/cn/ac/iie/utils/general/SnowflakeId.java +++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java @@ -1,10 +1,10 @@ -package cn.ac.iie.utils.general; +package com.zdjizhi.utils.general; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.zookeeper.DistributedLock; -import cn.ac.iie.utils.zookeeper.ZookeeperUtils; -import com.zdjizhi.utils.ZooKeeperLock; -import org.apache.log4j.Logger; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.zookeeper.DistributedLock; +import com.zdjizhi.utils.zookeeper.ZookeeperUtils; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; /** * 雪花算法 @@ -12,7 +12,7 @@ import org.apache.log4j.Logger; * @author qidaijie */ public class SnowflakeId { - private static Logger logger = Logger.getLogger(SnowflakeId.class); + private static final Log logger = LogFactory.get(); /** * 共64位 第一位为符号位 默认0 @@ -132,27 +132,6 @@ public class SnowflakeId { }finally { lock.unlock(); } - -// ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1"); -// if (lock.lock()) { -// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp); -// if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { -// throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); -// } -// if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { -// throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); -// } -// this.workerId = tmpWorkerId; -// this.dataCenterId = dataCenterIdNum; -// try { -// lock.unlock(); -// } catch (InterruptedException ie) { -// ie.printStackTrace(); -// } catch (Exception e) { -// e.printStackTrace(); -// logger.error("This is not usual error!!!===>>>" + e + "<<<==="); -// } -// } } // ==============================Methods========================================== diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java similarity index 87% rename from src/main/java/cn/ac/iie/utils/general/TransFormUtils.java rename to src/main/java/com/zdjizhi/utils/general/TransFormUtils.java index 6b04ce4..823c19b 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java @@ -1,19 +1,15 @@ -package cn.ac.iie.utils.general; +package com.zdjizhi.utils.general; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.json.JsonParseUtil; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.json.JsonParseUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.Encodes; -import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookup; -import com.zdjizhi.utils.JsonMapper; -import org.apache.log4j.Logger; import java.util.*; -import static cn.ac.iie.utils.general.TransFunction.*; - /** * 描述:转换或补全工具类 @@ -21,7 +17,7 @@ import static cn.ac.iie.utils.general.TransFunction.*; * @author qidaijie */ public class TransFormUtils { - private static Logger logger = Logger.getLogger(TransFormUtils.class); + private static final Log logger = LogFactory.get(); /** * 在内存中加载反射类用的map @@ -105,7 +101,7 @@ public class TransFormUtils { switch (function) { case "current_timestamp": if ((long) appendTo == 0L) { - JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime()); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime()); } break; case "snowflake_id": @@ -115,22 +111,22 @@ public class TransFormUtils { break; case "geo_ip_detail": if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(ipLookup, name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(ipLookup, name.toString())); } break; case "geo_asn": if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(ipLookup, name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(ipLookup, name.toString())); } break; case "geo_ip_country": if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(ipLookup, name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(ipLookup, name.toString())); } break; case "set_value": if (name != null && param != null) { - JsonParseUtil.setValue(object, appendToKeyName, setValue(param)); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param)); } break; case "get_value": @@ -140,27 +136,27 @@ public class TransFormUtils { break; case "if": if (param != null) { - JsonParseUtil.setValue(object, appendToKeyName, condition(object, param)); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param)); } break; case "sub_domain": if (appendTo == null && name != null) { - JsonParseUtil.setValue(object, appendToKeyName, getTopDomain(name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString())); } break; case "radius_match": if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, radiusMatch(name.toString())); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); } break; case "decode_of_base64": if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param))); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); } break; case "flattenSpec": if (name != null && param != null) { - JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param))); + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), TransFunction.isJsonValue(object, param))); } break; default: diff --git a/src/main/java/cn/ac/iie/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java similarity index 92% rename from src/main/java/cn/ac/iie/utils/general/TransFunction.java rename to src/main/java/com/zdjizhi/utils/general/TransFunction.java index ab834c0..76b2643 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -1,9 +1,9 @@ -package cn.ac.iie.utils.general; +package com.zdjizhi.utils.general; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.hbase.HBaseUtils; -import cn.ac.iie.utils.json.JsonParseUtil; -import cn.ac.iie.utils.system.LogPrintUtil; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.hbase.HBaseUtils; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.system.LogPrintUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.jayway.jsonpath.InvalidPathException; @@ -12,7 +12,6 @@ import com.zdjizhi.utils.Encodes; import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookup; import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.regex.Matcher; @@ -23,11 +22,11 @@ import java.util.regex.Pattern; */ class TransFunction { - private static Logger logger = Logger.getLogger(TransFunction.class); + private static final Log logger = LogFactory.get(); private static final Log log = LogFactory.get(); - private static final Pattern pattern = Pattern.compile("[0-9]*"); + private static final Pattern PATTERN = Pattern.compile("[0-9]*"); /** * 生成当前时间戳的操作 @@ -181,7 +180,7 @@ class TransFunction { String resultA = isJsonValue(object, split[1]); String resultB = isJsonValue(object, split[2]); String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB; - Matcher isNum = pattern.matcher(result); + Matcher isNum = PATTERN.matcher(result); if (isNum.matches()) { return Long.parseLong(result); } else { @@ -203,7 +202,7 @@ class TransFunction { */ static Object setValue(String param) { try { - Matcher isNum = pattern.matcher(param); + Matcher isNum = PATTERN.matcher(param); if (isNum.matches()) { return Long.parseLong(param); } else { diff --git a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java similarity index 81% rename from src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java rename to src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index 5371b06..4be1683 100644 --- a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -1,7 +1,8 @@ -package cn.ac.iie.utils.hbase; +package com.zdjizhi.utils.hbase; -import cn.ac.iie.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.common.FlowWriteConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -9,14 +10,13 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.log4j.Logger; import java.io.IOException; -import java.util.HashMap; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * HBase 工具类 @@ -25,9 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class HBaseUtils { - private final static Logger logger = Logger.getLogger(HBaseUtils.class); - private static Map subIdMap = new ConcurrentHashMap<>(83334); -// private static Map subId/Map = new HashMap<>(83334); + private static final Log logger = LogFactory.get(); + private static Map subIdMap = new ConcurrentHashMap<>(83334); private static Connection connection; private static Long time; @@ -48,14 +47,14 @@ public class HBaseUtils { zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS; hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME; //获取连接 - getHBaseConn(); + getHbaseConn(); //拉取所有 getAll(); //定时更新 updateHabaseCache(); } - private static void getHBaseConn() { + private static void getHbaseConn() { try { // 管理Hbase的配置信息 Configuration configuration = HBaseConfiguration.create(); @@ -67,7 +66,6 @@ public class HBaseUtils { connection = ConnectionFactory.createConnection(configuration); time = System.currentTimeMillis(); logger.warn("HBaseUtils get HBase connection,now to getAll()."); -// getAll(); } catch (IOException ioe) { logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); ioe.printStackTrace(); @@ -80,7 +78,7 @@ public class HBaseUtils { /** * 更新变量 */ - public static void change() { + private static void change() { if (hBaseUtils == null) { getHBaseInstance(); } @@ -89,10 +87,6 @@ public class HBaseUtils { } - public static void main(String[] args) { - change(); - } - /** * 获取变更内容 * @@ -177,8 +171,10 @@ public class HBaseUtils { * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie */ private void updateHabaseCache() { - Timer timer = new Timer(); - timer.scheduleAtFixedRate(new TimerTask() { +// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, +// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build()); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { @@ -188,7 +184,21 @@ public class HBaseUtils { e.printStackTrace(); } } - }, 1, 1000 * FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);//起始1ms,以后每隔60s + }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + +// +// Timer timer = new Timer(); +// timer.scheduleAtFixedRate(new TimerTask() { +// @Override +// public void run() { +// try { +// change(); +// } catch (Exception e) { +// logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); +// e.printStackTrace(); +// } +// } +// }, 1, 1000 * FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);//起始1ms,以后每隔60s } diff --git a/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java similarity index 90% rename from src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java rename to src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java index f544859..77725ee 100644 --- a/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java +++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java @@ -1,12 +1,13 @@ -package cn.ac.iie.utils.http; +package com.zdjizhi.utils.http; -import cn.ac.iie.utils.system.LogPrintUtil; +import com.zdjizhi.utils.system.LogPrintUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.log4j.Logger; import java.io.BufferedReader; import java.io.IOException; @@ -19,7 +20,7 @@ import java.io.InputStreamReader; */ public class HttpClientUtil { - private static Logger logger = Logger.getLogger(HttpClientUtil.class); + private static final Log logger = LogFactory.get(); /** * 请求网关获取schema diff --git a/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java similarity index 96% rename from src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java rename to src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 4b16a06..058c6f8 100644 --- a/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -1,7 +1,9 @@ -package cn.ac.iie.utils.json; +package com.zdjizhi.utils.json; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.http.HttpClientUtil; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.http.HttpClientUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; @@ -9,8 +11,6 @@ import com.jayway.jsonpath.JsonPath; import com.zdjizhi.utils.StringUtil; import net.sf.cglib.beans.BeanGenerator; import net.sf.cglib.beans.BeanMap; -import org.apache.log4j.Logger; -import scala.annotation.meta.field; import java.util.*; @@ -21,7 +21,7 @@ import java.util.*; * @author qidaijie */ public class JsonParseUtil { - private static Logger logger = Logger.getLogger(JsonParseUtil.class); + private static final Log logger = LogFactory.get(); /** * 模式匹配,给定一个类型字符串返回一个类类型 diff --git a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogSend.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java similarity index 92% rename from src/main/java/cn/ac/iie/utils/kafka/KafkaLogSend.java rename to src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java index 26e2e43..b7c270b 100644 --- a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogSend.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java @@ -1,9 +1,10 @@ -package cn.ac.iie.utils.kafka; +package com.zdjizhi.utils.kafka; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.common.KafkaProConfig; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.common.KafkaProConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.kafka.clients.producer.*; -import org.apache.log4j.Logger; import java.util.List; import java.util.Properties; @@ -16,7 +17,7 @@ import java.util.Properties; */ public class KafkaLogSend { - private static Logger logger = Logger.getLogger(KafkaLogSend.class); + private static final Log logger = LogFactory.get(); /** * kafka生产者,用于向kafka中发送消息 diff --git a/src/main/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java similarity index 98% rename from src/main/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java rename to src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java index 3078b66..ecfd0c5 100644 --- a/src/main/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -1,4 +1,4 @@ -package cn.ac.iie.utils.system; +package com.zdjizhi.utils.system; import java.util.Properties; diff --git a/src/main/java/cn/ac/iie/utils/system/TupleUtils.java b/src/main/java/com/zdjizhi/utils/system/TupleUtils.java similarity index 94% rename from src/main/java/cn/ac/iie/utils/system/TupleUtils.java rename to src/main/java/com/zdjizhi/utils/system/TupleUtils.java index 53e14ca..c55c0e8 100644 --- a/src/main/java/cn/ac/iie/utils/system/TupleUtils.java +++ b/src/main/java/com/zdjizhi/utils/system/TupleUtils.java @@ -1,4 +1,4 @@ -package cn.ac.iie.utils.system; +package com.zdjizhi.utils.system; import org.apache.storm.Constants; import org.apache.storm.tuple.Tuple; diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java similarity index 95% rename from src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java rename to src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java index 6df8c4b..04658fb 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java +++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java @@ -1,5 +1,7 @@ -package cn.ac.iie.utils.zookeeper; +package com.zdjizhi.utils.zookeeper; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -17,7 +19,7 @@ import java.util.concurrent.locks.Lock; * @author qidaijie */ public class DistributedLock implements Lock, Watcher { - private static Logger logger = Logger.getLogger(DistributedLock.class); + private static final Log logger = LogFactory.get(); private ZooKeeper zk = null; /** @@ -81,7 +83,7 @@ public class DistributedLock implements Lock, Watcher { } try { if (this.tryLock()) { - System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁"); + logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁"); } else { // 等待锁 waitForLock(waitLock, sessionTimeout); diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java similarity index 95% rename from src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java rename to src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java index 9868076..fb1c3eb 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java +++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java @@ -1,7 +1,8 @@ -package cn.ac.iie.utils.zookeeper; +package com.zdjizhi.utils.zookeeper; -import cn.ac.iie.utils.system.LogPrintUtil; -import org.apache.log4j.Logger; +import com.zdjizhi.utils.system.LogPrintUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.zookeeper.*; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -17,7 +18,7 @@ import java.util.concurrent.CountDownLatch; * @date 2020/11/1411:28 */ public class ZookeeperUtils implements Watcher { - private static Logger logger = Logger.getLogger(com.zdjizhi.utils.ZookeeperUtils.class); + private static final Log logger = LogFactory.get(); private ZooKeeper zookeeper; diff --git a/src/test/java/cn/ac/iie/test/DomainTest.java b/src/test/java/cn/ac/iie/test/DomainTest.java deleted file mode 100644 index 1e7c7a9..0000000 --- a/src/test/java/cn/ac/iie/test/DomainTest.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.ac.iie.test; - -import com.zdjizhi.utils.FormatUtils; -import org.junit.Test; - -import java.util.Date; - - -/** - * @author qidaijie - * @Package com.zdjizhi.flume - * @Description: - * @date 2020/11/2212:06 - */ -public class DomainTest { - - @Test - public void getDomainTest() { - String url = "array808.prod.do.dsp.mp.microsoft.com"; - System.out.println(FormatUtils.getTopPrivateDomain(url)); - - } -} diff --git a/src/test/java/cn/ac/iie/test/FunctionIfTest.java b/src/test/java/cn/ac/iie/test/FunctionIfTest.java deleted file mode 100644 index 8e332ad..0000000 --- a/src/test/java/cn/ac/iie/test/FunctionIfTest.java +++ /dev/null @@ -1,32 +0,0 @@ -package cn.ac.iie.test; - -import cn.ac.iie.common.FlowWriteConfig; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.IpLookup; -import org.junit.Test; - -/** - * @author qidaijie - * @Package cn.ac.iie.test - * @Description: - * @date 2020/12/2717:33 - */ -public class FunctionIfTest { - private static final Log log = LogFactory.get(); - private static IpLookup ipLookup = new IpLookup.Builder(false) - .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") - .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") - .build(); - - @Test - public void Test() { - String ip = "192.168.50.65"; - System.out.println(ipLookup.cityLookupDetail(ip)); - System.out.println(ipLookup.latLngLookup(ip)); - System.out.println(ipLookup.provinceLookup(ip)); - System.out.println(ipLookup.countryLookup(ip)); - System.out.println(ipLookup.cityLookup(ip)); - - } -}