diff --git a/pom.xml b/pom.xml index ea6236c..aac57ba 100644 --- a/pom.xml +++ b/pom.xml @@ -33,9 +33,8 @@ org.apache.maven.plugins maven-compiler-plugin - 7 - 7 - + 8 + 8 diff --git a/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/src/main/java/cn/ac/iie/config/ApplicationConfig.java index 50fe885..f8d799b 100644 --- a/src/main/java/cn/ac/iie/config/ApplicationConfig.java +++ b/src/main/java/cn/ac/iie/config/ApplicationConfig.java @@ -13,6 +13,7 @@ public class ApplicationConfig { public static final Integer OFFLINE_SCHEDULE_SECOND = ConfigUtils.getIntProperty("offline.schedule.second"); public static final String OFFLINE_IMPORT_PATH = ConfigUtils.getStringProperty("offline.import.path"); public static final String OFFLINE_OUTPUT_PATH = ConfigUtils.getStringProperty("offline.output.path"); + public static final Integer OFFLINE_READIN_BATCH = ConfigUtils.getIntProperty("offline.readin.batch"); public static final Boolean UPDATE_SWITCH = ConfigUtils.getBooleanProperty("update.switch"); @@ -27,7 +28,7 @@ public class ApplicationConfig { public static final Integer LOG_OFFLINE_NUMBER = ConfigUtils.getIntProperty("log.offline.number"); public static final String LOG_BC_QUERY_REPORT_FILE = ConfigUtils.getStringProperty("log.bc.query.report.file"); - public static final Integer MAXIMUM_URL_ONCE = ConfigUtils.getIntProperty("maximum.url.once"); + public static final Integer MAXIMUM_URL_ONCE_BC_QUERY = ConfigUtils.getIntProperty("maximum.url.once.bc.query"); } diff --git a/src/main/java/cn/ac/iie/dao/BaseMariaDB.java b/src/main/java/cn/ac/iie/dao/BaseMariaDB.java index ebf4677..c5fad03 100644 --- a/src/main/java/cn/ac/iie/dao/BaseMariaDB.java +++ b/src/main/java/cn/ac/iie/dao/BaseMariaDB.java @@ -1,6 +1,5 @@ package cn.ac.iie.dao; - import cn.ac.iie.config.MariaDBConfig; import cn.ac.iie.utils.TimeUtils; import org.apache.log4j.Logger; @@ -10,6 +9,7 @@ import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Properties; +import java.util.stream.Collectors; /** * @author yjy @@ -67,11 +67,17 @@ public class BaseMariaDB { return querySqlExecute(getQueryRecordSql(fqdn)); } + public ResultSet getDatabaseRecord(List fqdns){ + return querySqlExecute(getQueryRecordSql(fqdns)); + } + public void writeSqlExecute(String sql){ try { statement.executeUpdate(sql); + } catch (SQLIntegrityConstraintViolationException e){ + LOG.error("Duplicated entry for key 'PRIMARY'"); } catch (SQLException exception) { - LOG.debug("Sql : " + sql); + LOG.error("Sql : " + sql); exception.printStackTrace(); } } @@ -150,10 +156,18 @@ public class BaseMariaDB { .append(MariaDBConfig.MARIADB_TABLE).append(' ') .append(" WHERE fqdn = '").append(fqdn).append('\''); - String resSql = sql.toString(); - resSql = resSql.replace("'null'", "null"); + return sql.toString(); + } - return resSql; + public String getQueryRecordSql(List fqdns){ + StringBuilder sql = new StringBuilder("SELECT * FROM "); + String queryFqdns = fqdns.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")); + + sql.append(MariaDBConfig.MARIADB_DATABASE).append(".") + .append(MariaDBConfig.MARIADB_TABLE).append(' ') + .append(" WHERE fqdn in (").append(queryFqdns).append(")"); + + return sql.toString(); } public static String getExpiredRecordSql(){ diff --git a/src/main/java/cn/ac/iie/service/OfflineTask.java b/src/main/java/cn/ac/iie/service/OfflineTask.java index 0a495a8..adb2fa9 100644 --- a/src/main/java/cn/ac/iie/service/OfflineTask.java +++ b/src/main/java/cn/ac/iie/service/OfflineTask.java @@ -5,8 +5,8 @@ import cn.ac.iie.dao.BaseMariaDB; import cn.ac.iie.dao.FqdnFile; import cn.ac.iie.utils.BrightCloudUtils; import cn.ac.iie.utils.FileUtils; -import cn.ac.iie.utils.MariaDBConnect; import cn.ac.iie.utils.LogUtils; +import cn.ac.iie.utils.MariaDBConnect; import com.alibaba.fastjson.JSONObject; import org.apache.log4j.Logger; @@ -17,7 +17,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.text.DecimalFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.TimerTask; /** * @author yjy @@ -38,7 +40,6 @@ public class OfflineTask extends TimerTask { } private void runTask() throws SQLException, IOException { - List fileNames = catpureNewFiles(); List fqdns; @@ -48,153 +49,154 @@ public class OfflineTask extends TimerTask { BrightCloudUtils brightCloudUtils = new BrightCloudUtils(); List queryTypes = BrightCloudUtils.getQueryTypes(); - for (String fileName: fileNames){ + for (String fileName : fileNames) { File importFile = new File(fileName); String importFileName = FileUtils.getFileName(importFile); + InputStreamReader inputStreamReader = new InputStreamReader( + new FileInputStream(importFile), "GBK"); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader); + + Long fileLineCount = FileUtils.getFileLineNum(importFile); + + // 输出结果保存文件 LOG.info("[Offline import file query]-" + importFileName + ": File Found."); String outputFileName = - importFileName.substring(0, importFileName.length()-ApplicationConfig.OFFLINE_IMPORT_FILENAME_SUFFIX.length()) - + ApplicationConfig.OFFLINE_OUTPUT_FILENAME_SUFFIX; - // TODO 遍历处理消耗太大 - fqdns = FileUtils.readTxtFileIntoStringArrList(importFile.toString()); - fqdns = SingleTermTask.getCheckedFqdn(fqdns); + importFileName.substring(0, importFileName.length() - ApplicationConfig.OFFLINE_IMPORT_FILENAME_SUFFIX.length()) + + ApplicationConfig.OFFLINE_OUTPUT_FILENAME_SUFFIX; + File outputFile = new File(ApplicationConfig.OFFLINE_OUTPUT_PATH + "/" + outputFileName); + if (!outputFile.exists()) { + FileUtils.createFile(new File(ApplicationConfig.OFFLINE_OUTPUT_PATH), outputFileName); + } + OutputStream outStream = new FileOutputStream(outputFile); + OutputStreamWriter outWriter = new OutputStreamWriter(outStream, StandardCharsets.UTF_8); + // 添加表头 + outWriter.write(FqdnFile.getKeys() + "\n"); - long standardFqdnNum = fqdns.size(); + List toQueryBC = new ArrayList<>(); + + long standardFqdnNum = 0; + long complNum = 0; long dbQueryNum = 0; long bcQueryNum = 0; long failQueryNum = 0; long effecResNum = 0; long noLabelNum = 0; - if (standardFqdnNum>0){ - // 创建结果保存文件 - File outputFile = new File(ApplicationConfig.OFFLINE_OUTPUT_PATH + "/" + outputFileName); - if (!outputFile.exists()){ - FileUtils.createFile(new File(ApplicationConfig.OFFLINE_OUTPUT_PATH), outputFileName); - } + // 批量读取数据,读取时去空行&去空格 + LOG.info("[Offline import file query]-" + importFileName + " Progress: 0.00%"); + while ((fqdns = FileUtils.getBatchFqdnReadIn(bufferedReader, ApplicationConfig.OFFLINE_READIN_BATCH)).size()>0) { + // 校验格式&去重 + fqdns = SingleTermTask.getCheckedFqdn(fqdns); + standardFqdnNum += fqdns.size(); - OutputStream outStream = new FileOutputStream(outputFile); - OutputStreamWriter outWriter = new OutputStreamWriter(outStream, StandardCharsets.UTF_8); - outWriter.write(FqdnFile.getKeys() + "\n"); - - List fqdnToQuery = new ArrayList<>(); - // 遍历列表域名 - LOG.info("[Offline import file query]-" + importFileName + " Progress: 0%"); - for (int index=0; index dbQueryFqdns = new ArrayList<>(); + while (dbResult.next()) { + try { + FqdnFile fqdnFile = SingleTermTask.ResSet2FqdnFile(dbResult); + dbQueryFqdns.add(fqdnFile.getFqdn()); outWriter.write(fqdnFile.getValues() + "\n"); - if (fqdnFile.getQuery_success().equals(false)){ - failQueryNum += 1; - } else if (fqdnFile.getCategory_id().equals(0)){ - noLabelNum += 1; - } else { - effecResNum += 1; + + // 计数 + dbQueryNum += 1; + complNum += 1; + failQueryNum = fqdnFile.getQuery_success().equals(false) ? failQueryNum + 1 : failQueryNum; + noLabelNum = fqdnFile.getCategory_id().equals(0) ? noLabelNum + 1 : noLabelNum; + effecResNum = (fqdnFile.getQuery_success().equals(false) | fqdnFile.getCategory_id().equals(0)) ? + effecResNum : effecResNum+1; + + // 打印进度日志 + if (complNum > 0 && complNum % ApplicationConfig.LOG_OFFLINE_NUMBER == 0) { + String percent = new DecimalFormat("##.00%").format((float) complNum / fileLineCount); + LOG.info("[Offline import file query]-" + importFileName + " Progress:" + percent); } - } else { - // 查bc - fqdnToQuery.add(fqdn); - bcQueryNum = bcQueryNum + 1; - // 缓存满 - if (fqdnToQuery.size() == ApplicationConfig.MAXIMUM_URL_ONCE){ - // 批量查 - JSONObject resObj = brightCloudUtils.getQueryResults(fqdnToQuery); - List fqdnFiles = brightCloudUtils.responseSparse(resObj); - assert fqdnFiles.size()>0; - - // 存数据库 - mariaDB.insertRecords(fqdnFiles); - - // 写入output - for (FqdnFile tmpFile:fqdnFiles){ - outWriter.write(tmpFile.getValues() + "\n"); - if (tmpFile.getQuery_success().equals(false)){ - failQueryNum += 1; - } else if (tmpFile.getCategory_id().equals(0)){ - noLabelNum += 1; - } else { - effecResNum += 1; - } - } - -// // 打印处理进度日志 -// String percent = new DecimalFormat("##.0%").format((float)index/standardFqdnNum); -// LOG.info("[Offline import file query]-" + importFileName + " Progress:" + percent); - - // 缓存复位 - fqdnToQuery = new ArrayList<>(); - } - } - - // 打印处理进度至日志 - if (index>0 && index % ApplicationConfig.LOG_OFFLINE_NUMBER==0){ - String percent = new DecimalFormat("##.0%").format((float)index/standardFqdnNum); - LOG.info("[Offline import file query]-" + importFileName + " Progress:" + percent); + } catch (Exception e) { + LOG.error("[Offline import file query]-" + importFileName + ": Wrong in database query"); + e.printStackTrace(); } } - // 剩余待查bc - if (fqdnToQuery.size()>0){ - JSONObject resObj = brightCloudUtils.getQueryResults(fqdnToQuery); + + // 添加bc查询目标 + fqdns.removeAll(dbQueryFqdns); + toQueryBC.addAll(fqdns); + while (toQueryBC.size() > ApplicationConfig.MAXIMUM_URL_ONCE_BC_QUERY) { +// LOG.debug("Execute batch bc query..."); + JSONObject resObj = brightCloudUtils.getQueryResults(toQueryBC.subList(0, ApplicationConfig.MAXIMUM_URL_ONCE_BC_QUERY)); List fqdnFiles = brightCloudUtils.responseSparse(resObj); - assert fqdnFiles.size()>0; + assert fqdnFiles.size() > 0; // 存数据库 mariaDB.insertRecords(fqdnFiles); - // 写入output - for (FqdnFile tmpFile:fqdnFiles){ + for (FqdnFile tmpFile : fqdnFiles) { + // 写入output outWriter.write(tmpFile.getValues() + "\n"); - if (tmpFile.getQuery_success().equals(false)){ - failQueryNum += 1; - } else if (tmpFile.getCategory_id().equals(0)){ - noLabelNum += 1; - } else { - effecResNum += 1; + // 计数 + bcQueryNum += 1; + complNum += 1; + failQueryNum = tmpFile.getQuery_success().equals(false) ? failQueryNum + 1 : failQueryNum; + noLabelNum = tmpFile.getCategory_id().equals(0) ? noLabelNum + 1 : noLabelNum; + effecResNum = (tmpFile.getQuery_success().equals(false) | tmpFile.getCategory_id().equals(0)) ? effecResNum : effecResNum+1; + + // 打印进度日志 + if (complNum > 0 && complNum % ApplicationConfig.LOG_OFFLINE_NUMBER == 0) { + String percent = new DecimalFormat("##.00%").format((float) complNum / fileLineCount); + LOG.info("[Offline import file query]-" + importFileName + " Progress:" + percent); } } + toQueryBC = toQueryBC.subList(ApplicationConfig.MAXIMUM_URL_ONCE_BC_QUERY, toQueryBC.size()); } - - LOG.info("[Offline import file query]-" + importFileName + " Progress: 100%"); - // 打印处理结果至日志 - LOG.info("[Offline import file query]-" + importFileName + " " - + "Query result: submit " + standardFqdnNum+" valid fqdns, " - + dbQueryNum + " (" + new DecimalFormat("##.0%").format((float)dbQueryNum/standardFqdnNum) + ")" + " results from database," - + bcQueryNum + " (" + new DecimalFormat("##.0%").format((float)bcQueryNum/standardFqdnNum) + ")" + " results from bright cloud. " - + effecResNum + " (" + new DecimalFormat("##.0%").format((float)effecResNum/standardFqdnNum) + ")" + " effective results," - + failQueryNum + " (" + new DecimalFormat("##.0%").format((float)failQueryNum/standardFqdnNum) + ")" + " failed queries," - + noLabelNum + " (" + new DecimalFormat("##.0%").format((float)noLabelNum/standardFqdnNum) + ")" + " unlabeled results"); - LOG.info("[Offline import file query]-" + importFileName + " Results saved in " + outputFile.toString()); - // 打印查询服务调用记录日志 - if (bcQueryNum > 0){ - OutputStream bcQueryLogStream = new FileOutputStream(ApplicationConfig.LOG_BC_QUERY_REPORT_FILE, true); - OutputStreamWriter bcQueryLogWriter = new OutputStreamWriter(bcQueryLogStream, StandardCharsets.UTF_8); - for (String type : queryTypes) { - java.sql.Date d = new java.sql.Date(System.currentTimeMillis()); - bcQueryLogWriter.write(d + "," + "OfflineTask," + importFileName + "," + type + "," + bcQueryNum + "\n"); - } - - FileUtils.writerClose(bcQueryLogWriter, bcQueryLogStream); - } - - FileUtils.writerClose(outWriter, outStream); - } else { - continue; } - // 查询结束修改后缀 - importFile.renameTo(new File( - importFile.toString().substring(0, importFile.toString().length()-ApplicationConfig.OFFLINE_IMPORT_FILENAME_SUFFIX.length()) + // 完成剩余bc查询 + if (toQueryBC.size()>0){ + JSONObject resObj = brightCloudUtils.getQueryResults(toQueryBC); + List fqdnFiles = brightCloudUtils.responseSparse(resObj); + assert fqdnFiles.size() > 0; + // 存数据库 + mariaDB.insertRecords(fqdnFiles); + for (FqdnFile tmpFile : fqdnFiles) { + // 写入output + outWriter.write(tmpFile.getValues() + "\n"); + // 计数 + bcQueryNum += 1; + complNum += 1; + failQueryNum = tmpFile.getQuery_success().equals(false) ? failQueryNum + 1 : failQueryNum; + noLabelNum = tmpFile.getCategory_id().equals(0) ? noLabelNum + 1 : noLabelNum; + effecResNum = (tmpFile.getQuery_success().equals(true) && tmpFile.getCategory_id().equals(1)) ? effecResNum + 1 : effecResNum; + } + } + + LOG.info("[Offline import file query]-" + importFileName + " Progress: 100.00%"); + // 打印处理结果至日志 + LOG.info("[Offline import file query]-" + importFileName + " " + + "Query result: submit " + standardFqdnNum + " valid fqdns, " + + dbQueryNum + " (" + new DecimalFormat("##.0%").format((float) dbQueryNum / standardFqdnNum) + ")" + " results from database," + + bcQueryNum + " (" + new DecimalFormat("##.0%").format((float) bcQueryNum / standardFqdnNum) + ")" + " results from bright cloud. " + + effecResNum + " (" + new DecimalFormat("##.0%").format((float) effecResNum / standardFqdnNum) + ")" + " effective results," + + failQueryNum + " (" + new DecimalFormat("##.0%").format((float) failQueryNum / standardFqdnNum) + ")" + " failed queries," + + noLabelNum + " (" + new DecimalFormat("##.0%").format((float) noLabelNum / standardFqdnNum) + ")" + " unlabeled results"); + LOG.info("[Offline import file query]-" + importFileName + " Results saved in " + outputFile.toString()); + // 打印查询服务调用记录日志 + if (bcQueryNum > 0) { + OutputStream bcQueryLogStream = new FileOutputStream(ApplicationConfig.LOG_BC_QUERY_REPORT_FILE, true); + OutputStreamWriter bcQueryLogWriter = new OutputStreamWriter(bcQueryLogStream, StandardCharsets.UTF_8); + for (String type : queryTypes) { + java.sql.Date d = new java.sql.Date(System.currentTimeMillis()); + bcQueryLogWriter.write(d + "," + "OfflineTask," + importFileName + "," + type + "," + bcQueryNum + "\n"); + } + + FileUtils.writerClose(bcQueryLogWriter, bcQueryLogStream); + } + + FileUtils.writerClose(outWriter, outStream); + + //查询结束修改后缀 + importFile.renameTo(new File(importFile.toString().substring(0, importFile.toString().length() - ApplicationConfig.OFFLINE_IMPORT_FILENAME_SUFFIX.length()) + ApplicationConfig.OFFLINE_IMPORT_FILEDONE_SUFFIX)); + FileUtils.readerClose(bufferedReader, inputStreamReader); } MariaDBConnect.close(mariaStat, mariaConn); @@ -226,12 +228,6 @@ public class OfflineTask extends TimerTask { } return newFiles; } - - private Boolean checkFileName(String fileName) { - boolean isStandard = true; - if (!fileName.endsWith(ApplicationConfig.OFFLINE_IMPORT_FILENAME_SUFFIX)) { - isStandard = false; - } - return isStandard; - } } + + diff --git a/src/main/java/cn/ac/iie/service/SingleTermTask.java b/src/main/java/cn/ac/iie/service/SingleTermTask.java index 5d938d1..d7978fa 100644 --- a/src/main/java/cn/ac/iie/service/SingleTermTask.java +++ b/src/main/java/cn/ac/iie/service/SingleTermTask.java @@ -1,8 +1,10 @@ package cn.ac.iie.service; import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseMariaDB; import cn.ac.iie.dao.FqdnFile; import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; import java.sql.ResultSet; import java.sql.SQLException; @@ -18,6 +20,7 @@ import java.util.regex.Pattern; * @date 2021/2/25 2:40 下午 */ public class SingleTermTask { + private static final Logger LOG = Logger.getLogger(SingleTermTask.class); /** * 提取三级域名 @@ -71,6 +74,8 @@ public class SingleTermTask { // 去重 & 校验 if (isValidDomain(fqdn) && !res.contains(fqdn)){ res.add(fqdn.toLowerCase()); + } else { + LOG.debug("Bad or duplicated fqdn:" + fqdn); } } return res; diff --git a/src/main/java/cn/ac/iie/service/UpdateTask.java b/src/main/java/cn/ac/iie/service/UpdateTask.java index a6bba9e..8290016 100644 --- a/src/main/java/cn/ac/iie/service/UpdateTask.java +++ b/src/main/java/cn/ac/iie/service/UpdateTask.java @@ -10,13 +10,15 @@ import cn.ac.iie.utils.MariaDBConnect; import com.alibaba.fastjson.JSONObject; import org.apache.log4j.Logger; -import java.io.*; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import java.util.TimerTask; @@ -62,8 +64,7 @@ public class UpdateTask extends TimerTask { updateFqdns.add(unlabeledSet.getString("fqdn")); } long unlabeledNum = updateFqdns.size() - expiredNum; - - //TODO 分批查询 + if (updateFqdns.size()>0){ JSONObject jsonObj = brightCloudUtils.getQueryResults(updateFqdns); List updateFiles = brightCloudUtils.responseSparse(jsonObj); diff --git a/src/main/java/cn/ac/iie/utils/BrightCloudUtils.java b/src/main/java/cn/ac/iie/utils/BrightCloudUtils.java index 45b68e3..7e564d1 100644 --- a/src/main/java/cn/ac/iie/utils/BrightCloudUtils.java +++ b/src/main/java/cn/ac/iie/utils/BrightCloudUtils.java @@ -1,11 +1,10 @@ package cn.ac.iie.utils; -import cn.ac.iie.dao.FqdnFile; import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.FqdnFile; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.google.common.collect.Lists; import org.apache.log4j.Logger; import java.io.*; @@ -55,7 +54,7 @@ public class BrightCloudUtils { private final HashMap> catId2Info = new HashMap<>(); public JSONObject getQueryResults (List urls) { - if (urls.size()>ApplicationConfig.MAXIMUM_URL_ONCE){ + if (urls.size()>ApplicationConfig.MAXIMUM_URL_ONCE_BC_QUERY){ LOG.warn("Too many urls in a http post request!"); } JSONObject jsonRes = null; diff --git a/src/main/java/cn/ac/iie/utils/FileUtils.java b/src/main/java/cn/ac/iie/utils/FileUtils.java index 90df21b..dc68f11 100644 --- a/src/main/java/cn/ac/iie/utils/FileUtils.java +++ b/src/main/java/cn/ac/iie/utils/FileUtils.java @@ -1,11 +1,13 @@ package cn.ac.iie.utils; +import cn.ac.iie.config.ApplicationConfig; import org.apache.log4j.Logger; import java.io.*; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * @author yjy @@ -52,6 +54,22 @@ public class FileUtils { return list; } + public static List getBatchFqdnReadIn(BufferedReader bufferedReader, int batchSize){ + List list = new ArrayList<>(); + String lineTxt; + try{ + while ((lineTxt = bufferedReader.readLine()) != null && list.size()