From cc155b0f78927226720c87bcc7535e082d59ac8f Mon Sep 17 00:00:00 2001 From: zhanghongqing Date: Mon, 28 Dec 2020 17:32:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=96=87=E4=BB=B6=E7=BC=BA?= =?UTF-8?q?=E7=9C=81=E4=BB=A5.tmp=20=E7=BB=93=E5=B0=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- safe-file-roll-sink/pom.xml | 4 +- .../java/cn/flume/sinks/SafePathManager.java | 3 +- .../cn/flume/sinks/SafeRollingFileSink.java | 109 +++++++++--------- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/safe-file-roll-sink/pom.xml b/safe-file-roll-sink/pom.xml index b8cd634..73cd273 100644 --- a/safe-file-roll-sink/pom.xml +++ b/safe-file-roll-sink/pom.xml @@ -4,9 +4,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - cn.huyanping.flume + cn.flume safe-roll-file-sink - 0.2 + 1.0 diff --git a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java index cc26ff0..c63e9d3 100644 --- a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java +++ b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java @@ -14,6 +14,7 @@ public class SafePathManager { private File baseDirectory; private AtomicInteger fileIndex; private String filePrefix = ""; + private static final String IN_USE_SUFFIX = ".tmp"; private File currentFile; @@ -24,7 +25,7 @@ public class SafePathManager { //重新生成微秒级文件名 seriesTimestamp = System.currentTimeMillis(); fileIndex = new AtomicInteger(); - currentFile = new File(baseDirectory, filePrefix + fileIndex.incrementAndGet() + "-" + seriesTimestamp ); + currentFile = new File(baseDirectory, filePrefix + fileIndex.incrementAndGet() + "-" + seriesTimestamp + IN_USE_SUFFIX ); return currentFile; } diff --git a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java index 84145d6..a609205 100644 --- a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java +++ b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java @@ -18,8 +18,7 @@ import java.util.concurrent.TimeUnit; public class SafeRollingFileSink extends AbstractSink implements Configurable { - private static final Logger logger = LoggerFactory - .getLogger(SafeRollingFileSink.class); + private static final Logger logger = LoggerFactory.getLogger(SafeRollingFileSink.class); private static final long defaultRollInterval = 30; private static final int defaultBatchSize = 100; @@ -92,35 +91,35 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { this.targetDirectory = new File(targetDirectory); //检查目录权限 - if(!this.directory.exists()){ - if(!this.directory.mkdirs()){ + if (!this.directory.exists()) { + if (!this.directory.mkdirs()) { throw new IllegalArgumentException("sink.directory is not a directory"); } - }else if(!this.directory.canWrite()){ + } else if (!this.directory.canWrite()) { throw new IllegalArgumentException("sink.directory can not write"); } //检查目标目录权限 - if(!this.targetDirectory.exists()){ - if(!this.targetDirectory.mkdirs()){ + if (!this.targetDirectory.exists()) { + if (!this.targetDirectory.mkdirs()) { throw new IllegalArgumentException("sink.directory is not a directory"); } - }else if(!this.targetDirectory.canWrite()){ + } else if (!this.targetDirectory.canWrite()) { throw new IllegalArgumentException("sink.directory can not write"); } //配置文件复制 - if(copyDirectory.length()>0 && useCopy){ + if (copyDirectory.length() > 0 && useCopy) { String[] copyDirectories = copyDirectory.split(","); this.copyDirectory = new File[copyDirectories.length]; - for(int i=0; i 0){ + if (rollInterval > 0) { rollService = Executors.newScheduledThreadPool( 1, @@ -147,12 +146,12 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { "rollingFileSink-roller-" + Thread.currentThread().getId() + "-%d").build()); - /* - * Every N seconds, mark that it's time to rotate. We purposefully do NOT - * touch anything other than the indicator flag to avoid error handling - * issues (e.g. IO exceptions occuring in two different threads. - * Resist the urge to actually perform rotation in a separate thread! - */ + /* + * Every N seconds, mark that it's time to rotate. We purposefully do NOT + * touch anything other than the indicator flag to avoid error handling + * issues (e.g. IO exceptions occuring in two different threads. + * Resist the urge to actually perform rotation in a separate thread! + */ rollService.scheduleAtFixedRate(new Runnable() { public void run() { @@ -162,7 +161,7 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { } }, rollInterval, rollInterval, TimeUnit.SECONDS); - } else{ + } else { logger.info("RollInterval is not valid, file rolling will not happen."); } logger.info("RollingFileSink {} started.", getName()); @@ -181,14 +180,14 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { outputStream.close(); sinkCounter.incrementConnectionClosedCount(); shouldRotate = false; - if(useCopy){ - if(!copyLogFile(pathController.getCurrentFile())){ + if (useCopy) { + if (!copyLogFile(pathController.getCurrentFile())) { logger.error("Copy completed file failed"); throw new IOException("Copy completed file failed"); } } //文件名加后缀、移动文件 - if(!rename(pathController.getCurrentFile())){ + if (!rename(pathController.getCurrentFile())) { logger.error("Rename completed file failed"); throw new IOException("Rname completed file failed"); } @@ -237,15 +236,15 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { eventAttemptCounter++; serializer.write(event); - /* - * FIXME: Feature: Rotate on size and time by checking bytes written and - * setting shouldRotate = true if we're past a threshold. - */ + /* + * FIXME: Feature: Rotate on size and time by checking bytes written and + * setting shouldRotate = true if we're past a threshold. + */ - /* - * FIXME: Feature: Control flush interval based on time or number of - * events. For now, we're super-conservative and flush on each write. - */ + /* + * FIXME: Feature: Control flush interval based on time or number of + * events. For now, we're super-conservative and flush on each write. + */ } else { // No events found, request back-off semantics from runner result = Status.BACKOFF; @@ -280,14 +279,14 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { serializer.beforeClose(); outputStream.close(); sinkCounter.incrementConnectionClosedCount(); - if(useCopy){ - if(!copyLogFile(pathController.getCurrentFile())){ + if (useCopy) { + if (!copyLogFile(pathController.getCurrentFile())) { logger.error("Copy completed file failed"); throw new IOException("Copy completed file failed"); } } //文件名加后缀、移动文件 - if(!rename(pathController.getCurrentFile())){ + if (!rename(pathController.getCurrentFile())) { logger.error("Rename completed file failed"); throw new IOException("Ranme completed file failed"); } @@ -299,7 +298,7 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { serializer = null; } } - if(rollInterval > 0){ + if (rollInterval > 0) { rollService.shutdown(); while (!rollService.isTerminated()) { @@ -338,26 +337,30 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { logger.info("Delete empty file{}", current.getName()); return current.delete(); } - if(useFileSuffix && moveFile){ - return current.renameTo(new File(this.targetDirectory, current.getName() + fileSuffix)); - }else if(useFileSuffix){ - return current.renameTo(new File(this.directory, current.getName() + fileSuffix)); - }else if(moveFile){ - return current.renameTo(new File(this.targetDirectory, current.getName())); - }else{ + if (useFileSuffix && moveFile) { + return current.renameTo(new File(this.targetDirectory, removeFileSuffix(current) + fileSuffix)); + } else if (useFileSuffix) { + return current.renameTo(new File(this.directory, removeFileSuffix(current) + fileSuffix)); + } else if (moveFile) { + return current.renameTo(new File(this.targetDirectory, removeFileSuffix(current))); + } else { return true; } } + private String removeFileSuffix(File current) { + return current.getName().substring(0, current.getName().lastIndexOf(".")); + } + private boolean copyLogFile(File current) throws IOException { if (current.length() == 0L) { logger.info("Delete empty file{}", current.getName()); return current.delete(); } - for(File targetDir : this.copyDirectory){ - File targetFile = new File(targetDir.getAbsolutePath(), current.getName() + fileSuffix); + for (File targetDir : this.copyDirectory) { + File targetFile = new File(targetDir.getAbsolutePath(), removeFileSuffix(current) + fileSuffix); boolean copyResult = this.copyFile(current, targetFile, false); - if(!copyResult) return false; + if (!copyResult) return false; } return true; @@ -366,18 +369,12 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable { /** * 复制单个文件 * - * @param srcFile - * 待复制的文件名 - * @param destFile - * 目标文件名 - * @param overlay - * 如果目标文件存在,是否覆盖 + * @param srcFile 待复制的文件名 + * @param destFile 目标文件名 + * @param overlay 如果目标文件存在,是否覆盖 * @return 如果复制成功返回true,否则返回false */ - public boolean copyFile(File srcFile, File destFile, - boolean overlay) throws IOException { - - + public boolean copyFile(File srcFile, File destFile, boolean overlay) throws IOException { // 判断源文件是否存在 if (!srcFile.exists()) { throw new IOException("Copy file failed, source file does not exists");