diff --git a/pom.xml b/pom.xml index a3294cf..008a5e3 100644 --- a/pom.xml +++ b/pom.xml @@ -109,7 +109,7 @@ org.apache.storm storm-core ${storm.version} - provided + slf4j-log4j12 @@ -303,5 +303,11 @@ 0.9.0 + + cn.hutool + hutool-all + 5.5.2 + + diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 6143acf..3f49225 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -13,7 +13,7 @@ zookeeper.servers=192.168.44.12:2181 hbase.zookeeper.servers=192.168.44.12:2181 #定位库地址 -ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ +ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\dll\\ #ip.library=/home/bigdata/topology/dat/ #网关的schema位置 @@ -91,22 +91,9 @@ max.failure.num=20 #邮件默认编码 mail.default.charset=UTF-8 -#压缩模式 none or snappy -kafka.compression.type=none - #需不需要补subscriber_id,需要则为yes,不需要为no need.complete.subid=yes #需不要补全,不需要则原样日志输出 log.need.complete=yes - -#--------------------------------influx------------------------------# -#influx地址 -influx.ip=http://192.168.40.151:8086 - -#influx用户名 -influx.username=admin - -#influx密码 -influx.password=admin diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java index 731b12d..cd7d0c2 100644 --- a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java @@ -32,13 +32,6 @@ public class FlowWriteConfig { public static final String NEED_COMPLETE_SUBID = FlowWriteConfigurations.getStringProperty(0, "need.complete.subid"); public static final String LOG_NEED_COMPLETE = FlowWriteConfigurations.getStringProperty(0, "log.need.complete"); - /** - * influxDB - */ - public static final String INFLUX_IP = FlowWriteConfigurations.getStringProperty(0, "influx.ip"); - public static final String INFLUX_USERNAME = FlowWriteConfigurations.getStringProperty(0, "influx.username"); - public static final String INFLUX_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "influx.password"); - /** * kafka */ diff --git a/src/main/java/cn/ac/iie/utils/general/TransFunction.java b/src/main/java/cn/ac/iie/utils/general/TransFunction.java index 47f3292..ab834c0 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFunction.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFunction.java @@ -3,9 +3,11 @@ package cn.ac.iie.utils.general; import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.utils.hbase.HBaseUtils; import cn.ac.iie.utils.json.JsonParseUtil; +import cn.ac.iie.utils.system.LogPrintUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.PathNotFoundException; import com.zdjizhi.utils.Encodes; import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookup; @@ -13,7 +15,6 @@ import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -21,7 +22,11 @@ import java.util.regex.Pattern; * @author qidaijie */ class TransFunction { + private static Logger logger = Logger.getLogger(TransFunction.class); + + private static final Log log = LogFactory.get(); + private static final Pattern pattern = Pattern.compile("[0-9]*"); /** @@ -74,7 +79,7 @@ class TransFunction { static String radiusMatch(String ip) { String account = HBaseUtils.getAccount(ip.trim()); if (StringUtil.isBlank(account)) { - logger.warn("HashMap get Account is null,IP is :" + ip); + log.warn("HashMap get account is null, Ip is :{}", ip); } return account; } @@ -89,7 +94,8 @@ class TransFunction { try { return FormatUtils.getTopPrivateDomain(domain); } catch (StringIndexOutOfBoundsException outException) { - logger.error("解析顶级域名异常,异常域名:" + domain, outException); + log.error("解析顶级域名异常,异常域名:{}" + domain); + logger.error(LogPrintUtil.print(outException)); return ""; } } @@ -112,7 +118,7 @@ class TransFunction { } } } catch (Exception e) { - logger.error("解析 Base64 异常,异常信息:" + e); + logger.error("解析 Base64 异常,异常信息:" + LogPrintUtil.print(e)); } return result; } @@ -132,7 +138,7 @@ class TransFunction { flattenResult = read.get(0); } } catch (ClassCastException | InvalidPathException e) { - logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e); + log.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + LogPrintUtil.print(e)); } return flattenResult; } @@ -184,14 +190,14 @@ class TransFunction { } } } catch (Exception e) { - logger.error("IF 函数执行异常,异常信息:" + e); - e.printStackTrace(); + logger.error("IF 函数执行异常,异常信息:" + LogPrintUtil.print(e)); } return null; } /** * 设置固定值函数 若为数字则转为long返回 + * * @param param 默认值 * @return 返回数字或字符串 */ @@ -204,8 +210,7 @@ class TransFunction { return param; } } catch (Exception e) { - logger.error("SetValue 函数异常,异常信息:" + e); - e.printStackTrace(); + logger.error("SetValue 函数异常,异常信息:" + LogPrintUtil.print(e)); } return null; } diff --git a/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java b/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java index 347a69b..f544859 100644 --- a/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java +++ b/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java @@ -1,10 +1,12 @@ package cn.ac.iie.utils.http; +import cn.ac.iie.utils.system.LogPrintUtil; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.log4j.Logger; import java.io.BufferedReader; import java.io.IOException; @@ -17,6 +19,8 @@ import java.io.InputStreamReader; */ public class HttpClientUtil { + private static Logger logger = Logger.getLogger(HttpClientUtil.class); + /** * 请求网关获取schema * @param http 网关url @@ -25,28 +29,32 @@ public class HttpClientUtil { public static String requestByGetMethod(String http) { CloseableHttpClient httpClient = HttpClients.createDefault(); StringBuilder entityStringBuilder = null; - try { - HttpGet get = new HttpGet(http); - try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - String line = null; - while ((line = bufferedReader.readLine()) != null) { - entityStringBuilder.append(line); - } + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + String line; + while ((line = bufferedReader.readLine()) != null) { + entityStringBuilder.append(line); } } } catch (Exception e) { - e.printStackTrace(); + logger.error(LogPrintUtil.print(e)); } finally { - try { - if (httpClient != null) { + if (httpClient != null) { + try { httpClient.close(); + } catch (IOException e) { + logger.error(LogPrintUtil.print(e)); } - } catch (IOException e) { - e.printStackTrace(); + } + if (bufferedReader != null) { +// bufferedReader.close(); + org.apache.commons.io.IOUtils.closeQuietly(bufferedReader); } } return entityStringBuilder.toString(); diff --git a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java deleted file mode 100644 index c51589b..0000000 --- a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java +++ /dev/null @@ -1,85 +0,0 @@ -package cn.ac.iie.utils.influxdb; - - -import cn.ac.iie.common.FlowWriteConfig; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; -import org.influxdb.dto.Point; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.function.LongFunction; - -/** - * 写入influxDB工具类 - * - * @author antlee - * @date 2018/8/17 - */ -public class InfluxDbUtils { - /** - * 原始日志写入数据中心kafka失败标识 - */ - public static void sendKafkaFail(int discarded) { - InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD); - Point point1 = Point.measurement("SendKafkaFail") - .tag("topology", FlowWriteConfig.KAFKA_TOPIC) - .tag("hostname", getIp()) - .field("discarded", discarded) - .build(); - client.write("BusinessMonitor", "", point1); - } - - /** - * 原始日志写入数据中心kafka失败标识 - */ - public static void sendKafkaSuccess(Long complete) { - if (complete != 0) { - InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD); - Point point1 = Point.measurement("SendKafkaSuccess") - .tag("topology", FlowWriteConfig.KAFKA_TOPIC) - .tag("hostname", getIp()) - .field("complete", complete) - .build(); - client.write("BusinessMonitor", "", point1); - } - } - - /** - * 记录对准失败次数-即内存中没有对应的key - * - * @param failure 对准失败量 - */ - public static void sendHBaseFailure(int failure) { - if (failure != 0) { - InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD); - Point point1 = Point.measurement("sendHBaseFailure") - .tag("topic", FlowWriteConfig.KAFKA_TOPIC) - .field("failure", failure) - .build(); - client.write("BusinessMonitor", "", point1); - } - } - - /** - * 获取本机IP - * - * @return IP地址 - */ - private static String getIp() { - InetAddress addr; - try { - addr = InetAddress.getLocalHost(); - return addr.getHostAddress(); - } catch (UnknownHostException e) { - e.printStackTrace(); - return null; - } - } - - public static void main(String[] args) { - sendKafkaFail(100); -// sendKafkaSuccess(100L); - } - -} diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java index e81885a..9868076 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java @@ -1,5 +1,6 @@ package cn.ac.iie.utils.zookeeper; +import cn.ac.iie.utils.system.LogPrintUtil; import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.ACL; @@ -56,7 +57,7 @@ public class ZookeeperUtils implements Watcher { } } } catch (KeeperException | InterruptedException e) { - logger.error("modify error Can't modify," + e.getMessage()); + logger.error("modify error Can't modify," + LogPrintUtil.print(e)); } finally { closeConn(); } @@ -87,7 +88,7 @@ public class ZookeeperUtils implements Watcher { zookeeper.close(); } } catch (InterruptedException e) { - e.printStackTrace(); + logger.error(LogPrintUtil.print(e)); } } @@ -104,8 +105,7 @@ public class ZookeeperUtils implements Watcher { byte[] resByte = zookeeper.getData(path, true, stat); result = new String(resByte); } catch (KeeperException | InterruptedException e) { - logger.error("Get node information exception"); - e.printStackTrace(); + logger.error("Get node information exception" + LogPrintUtil.print(e)); } return result; } @@ -129,7 +129,7 @@ public class ZookeeperUtils implements Watcher { logger.warn("Node already exists ! Don't need to create"); } } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); + logger.error(LogPrintUtil.print(e)); } finally { closeConn(); } diff --git a/src/test/java/cn/ac/iie/test/DomainTest.java b/src/test/java/cn/ac/iie/test/DomainTest.java index b4584e2..1e7c7a9 100644 --- a/src/test/java/cn/ac/iie/test/DomainTest.java +++ b/src/test/java/cn/ac/iie/test/DomainTest.java @@ -3,6 +3,9 @@ package cn.ac.iie.test; import com.zdjizhi.utils.FormatUtils; import org.junit.Test; +import java.util.Date; + + /** * @author qidaijie * @Package com.zdjizhi.flume @@ -15,5 +18,6 @@ public class DomainTest { public void getDomainTest() { String url = "array808.prod.do.dsp.mp.microsoft.com"; System.out.println(FormatUtils.getTopPrivateDomain(url)); + } } diff --git a/src/test/java/cn/ac/iie/test/FunctionIfTest.java b/src/test/java/cn/ac/iie/test/FunctionIfTest.java index 440d785..8e332ad 100644 --- a/src/test/java/cn/ac/iie/test/FunctionIfTest.java +++ b/src/test/java/cn/ac/iie/test/FunctionIfTest.java @@ -1,5 +1,9 @@ package cn.ac.iie.test; +import cn.ac.iie.common.FlowWriteConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.IpLookup; import org.junit.Test; /** @@ -9,11 +13,20 @@ import org.junit.Test; * @date 2020/12/2717:33 */ public class FunctionIfTest { + private static final Log log = LogFactory.get(); + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") + .build(); @Test - public void IfTest() { - String s = "123"; - Object obj = s; + public void Test() { + String ip = "192.168.50.65"; + System.out.println(ipLookup.cityLookupDetail(ip)); + System.out.println(ipLookup.latLngLookup(ip)); + System.out.println(ipLookup.provinceLookup(ip)); + System.out.println(ipLookup.countryLookup(ip)); + System.out.println(ipLookup.cityLookup(ip)); } }