临时文件缺省以.tmp 结尾

This commit is contained in:
zhanghongqing
2020-12-28 17:32:01 +08:00
parent 7ef4eb8da8
commit cc155b0f78
3 changed files with 57 additions and 59 deletions

View File

@@ -4,9 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>cn.huyanping.flume</groupId> <groupId>cn.flume</groupId>
<artifactId>safe-roll-file-sink</artifactId> <artifactId>safe-roll-file-sink</artifactId>
<version>0.2</version> <version>1.0</version>
<dependencies> <dependencies>
<dependency> <dependency>

View File

@@ -14,6 +14,7 @@ public class SafePathManager {
private File baseDirectory; private File baseDirectory;
private AtomicInteger fileIndex; private AtomicInteger fileIndex;
private String filePrefix = ""; private String filePrefix = "";
private static final String IN_USE_SUFFIX = ".tmp";
private File currentFile; private File currentFile;
@@ -24,7 +25,7 @@ public class SafePathManager {
//重新生成微秒级文件名 //重新生成微秒级文件名
seriesTimestamp = System.currentTimeMillis(); seriesTimestamp = System.currentTimeMillis();
fileIndex = new AtomicInteger(); fileIndex = new AtomicInteger();
currentFile = new File(baseDirectory, filePrefix + fileIndex.incrementAndGet() + "-" + seriesTimestamp ); currentFile = new File(baseDirectory, filePrefix + fileIndex.incrementAndGet() + "-" + seriesTimestamp + IN_USE_SUFFIX );
return currentFile; return currentFile;
} }

View File

@@ -18,8 +18,7 @@ import java.util.concurrent.TimeUnit;
public class SafeRollingFileSink extends AbstractSink implements Configurable { public class SafeRollingFileSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory private static final Logger logger = LoggerFactory.getLogger(SafeRollingFileSink.class);
.getLogger(SafeRollingFileSink.class);
private static final long defaultRollInterval = 30; private static final long defaultRollInterval = 30;
private static final int defaultBatchSize = 100; private static final int defaultBatchSize = 100;
@@ -92,35 +91,35 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
this.targetDirectory = new File(targetDirectory); this.targetDirectory = new File(targetDirectory);
//检查目录权限 //检查目录权限
if(!this.directory.exists()){ if (!this.directory.exists()) {
if(!this.directory.mkdirs()){ if (!this.directory.mkdirs()) {
throw new IllegalArgumentException("sink.directory is not a directory"); throw new IllegalArgumentException("sink.directory is not a directory");
} }
}else if(!this.directory.canWrite()){ } else if (!this.directory.canWrite()) {
throw new IllegalArgumentException("sink.directory can not write"); throw new IllegalArgumentException("sink.directory can not write");
} }
//检查目标目录权限 //检查目标目录权限
if(!this.targetDirectory.exists()){ if (!this.targetDirectory.exists()) {
if(!this.targetDirectory.mkdirs()){ if (!this.targetDirectory.mkdirs()) {
throw new IllegalArgumentException("sink.directory is not a directory"); throw new IllegalArgumentException("sink.directory is not a directory");
} }
}else if(!this.targetDirectory.canWrite()){ } else if (!this.targetDirectory.canWrite()) {
throw new IllegalArgumentException("sink.directory can not write"); throw new IllegalArgumentException("sink.directory can not write");
} }
//配置文件复制 //配置文件复制
if(copyDirectory.length()>0 && useCopy){ if (copyDirectory.length() > 0 && useCopy) {
String[] copyDirectories = copyDirectory.split(","); String[] copyDirectories = copyDirectory.split(",");
this.copyDirectory = new File[copyDirectories.length]; this.copyDirectory = new File[copyDirectories.length];
for(int i=0; i<copyDirectories.length; i++){ for (int i = 0; i < copyDirectories.length; i++) {
this.copyDirectory[i] = new File(copyDirectories[i]); this.copyDirectory[i] = new File(copyDirectories[i]);
//检查目标目录权限 //检查目标目录权限
if(!this.copyDirectory[i].exists()){ if (!this.copyDirectory[i].exists()) {
if(!this.copyDirectory[i].mkdirs()){ if (!this.copyDirectory[i].mkdirs()) {
throw new IllegalArgumentException("sink.directory is not a directory"); throw new IllegalArgumentException("sink.directory is not a directory");
} }
}else if(!this.copyDirectory[i].canWrite()){ } else if (!this.copyDirectory[i].canWrite()) {
throw new IllegalArgumentException("sink.directory can not write"); throw new IllegalArgumentException("sink.directory can not write");
} }
} }
@@ -139,7 +138,7 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
pathController.setBaseDirectory(directory); pathController.setBaseDirectory(directory);
pathController.setFilePrefix(filePrefix); pathController.setFilePrefix(filePrefix);
if(rollInterval > 0){ if (rollInterval > 0) {
rollService = Executors.newScheduledThreadPool( rollService = Executors.newScheduledThreadPool(
1, 1,
@@ -147,12 +146,12 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
"rollingFileSink-roller-" + "rollingFileSink-roller-" +
Thread.currentThread().getId() + "-%d").build()); Thread.currentThread().getId() + "-%d").build());
/* /*
* Every N seconds, mark that it's time to rotate. We purposefully do NOT * Every N seconds, mark that it's time to rotate. We purposefully do NOT
* touch anything other than the indicator flag to avoid error handling * touch anything other than the indicator flag to avoid error handling
* issues (e.g. IO exceptions occuring in two different threads. * issues (e.g. IO exceptions occuring in two different threads.
* Resist the urge to actually perform rotation in a separate thread! * Resist the urge to actually perform rotation in a separate thread!
*/ */
rollService.scheduleAtFixedRate(new Runnable() { rollService.scheduleAtFixedRate(new Runnable() {
public void run() { public void run() {
@@ -162,7 +161,7 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
} }
}, rollInterval, rollInterval, TimeUnit.SECONDS); }, rollInterval, rollInterval, TimeUnit.SECONDS);
} else{ } else {
logger.info("RollInterval is not valid, file rolling will not happen."); logger.info("RollInterval is not valid, file rolling will not happen.");
} }
logger.info("RollingFileSink {} started.", getName()); logger.info("RollingFileSink {} started.", getName());
@@ -181,14 +180,14 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
outputStream.close(); outputStream.close();
sinkCounter.incrementConnectionClosedCount(); sinkCounter.incrementConnectionClosedCount();
shouldRotate = false; shouldRotate = false;
if(useCopy){ if (useCopy) {
if(!copyLogFile(pathController.getCurrentFile())){ if (!copyLogFile(pathController.getCurrentFile())) {
logger.error("Copy completed file failed"); logger.error("Copy completed file failed");
throw new IOException("Copy completed file failed"); throw new IOException("Copy completed file failed");
} }
} }
//文件名加后缀、移动文件 //文件名加后缀、移动文件
if(!rename(pathController.getCurrentFile())){ if (!rename(pathController.getCurrentFile())) {
logger.error("Rename completed file failed"); logger.error("Rename completed file failed");
throw new IOException("Rname completed file failed"); throw new IOException("Rname completed file failed");
} }
@@ -237,15 +236,15 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
eventAttemptCounter++; eventAttemptCounter++;
serializer.write(event); serializer.write(event);
/* /*
* FIXME: Feature: Rotate on size and time by checking bytes written and * FIXME: Feature: Rotate on size and time by checking bytes written and
* setting shouldRotate = true if we're past a threshold. * setting shouldRotate = true if we're past a threshold.
*/ */
/* /*
* FIXME: Feature: Control flush interval based on time or number of * FIXME: Feature: Control flush interval based on time or number of
* events. For now, we're super-conservative and flush on each write. * events. For now, we're super-conservative and flush on each write.
*/ */
} else { } else {
// No events found, request back-off semantics from runner // No events found, request back-off semantics from runner
result = Status.BACKOFF; result = Status.BACKOFF;
@@ -280,14 +279,14 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
serializer.beforeClose(); serializer.beforeClose();
outputStream.close(); outputStream.close();
sinkCounter.incrementConnectionClosedCount(); sinkCounter.incrementConnectionClosedCount();
if(useCopy){ if (useCopy) {
if(!copyLogFile(pathController.getCurrentFile())){ if (!copyLogFile(pathController.getCurrentFile())) {
logger.error("Copy completed file failed"); logger.error("Copy completed file failed");
throw new IOException("Copy completed file failed"); throw new IOException("Copy completed file failed");
} }
} }
//文件名加后缀、移动文件 //文件名加后缀、移动文件
if(!rename(pathController.getCurrentFile())){ if (!rename(pathController.getCurrentFile())) {
logger.error("Rename completed file failed"); logger.error("Rename completed file failed");
throw new IOException("Ranme completed file failed"); throw new IOException("Ranme completed file failed");
} }
@@ -299,7 +298,7 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
serializer = null; serializer = null;
} }
} }
if(rollInterval > 0){ if (rollInterval > 0) {
rollService.shutdown(); rollService.shutdown();
while (!rollService.isTerminated()) { while (!rollService.isTerminated()) {
@@ -338,26 +337,30 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
logger.info("Delete empty file{}", current.getName()); logger.info("Delete empty file{}", current.getName());
return current.delete(); return current.delete();
} }
if(useFileSuffix && moveFile){ if (useFileSuffix && moveFile) {
return current.renameTo(new File(this.targetDirectory, current.getName() + fileSuffix)); return current.renameTo(new File(this.targetDirectory, removeFileSuffix(current) + fileSuffix));
}else if(useFileSuffix){ } else if (useFileSuffix) {
return current.renameTo(new File(this.directory, current.getName() + fileSuffix)); return current.renameTo(new File(this.directory, removeFileSuffix(current) + fileSuffix));
}else if(moveFile){ } else if (moveFile) {
return current.renameTo(new File(this.targetDirectory, current.getName())); return current.renameTo(new File(this.targetDirectory, removeFileSuffix(current)));
}else{ } else {
return true; return true;
} }
} }
private String removeFileSuffix(File current) {
return current.getName().substring(0, current.getName().lastIndexOf("."));
}
private boolean copyLogFile(File current) throws IOException { private boolean copyLogFile(File current) throws IOException {
if (current.length() == 0L) { if (current.length() == 0L) {
logger.info("Delete empty file{}", current.getName()); logger.info("Delete empty file{}", current.getName());
return current.delete(); return current.delete();
} }
for(File targetDir : this.copyDirectory){ for (File targetDir : this.copyDirectory) {
File targetFile = new File(targetDir.getAbsolutePath(), current.getName() + fileSuffix); File targetFile = new File(targetDir.getAbsolutePath(), removeFileSuffix(current) + fileSuffix);
boolean copyResult = this.copyFile(current, targetFile, false); boolean copyResult = this.copyFile(current, targetFile, false);
if(!copyResult) return false; if (!copyResult) return false;
} }
return true; return true;
@@ -366,18 +369,12 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
/** /**
* 复制单个文件 * 复制单个文件
* *
* @param srcFile * @param srcFile 待复制的文件名
* 待复制的文件名 * @param destFile 目标文件名
* @param destFile * @param overlay 如果目标文件存在,是否覆盖
* 目标文件名
* @param overlay
* 如果目标文件存在,是否覆盖
* @return 如果复制成功返回true否则返回false * @return 如果复制成功返回true否则返回false
*/ */
public boolean copyFile(File srcFile, File destFile, public boolean copyFile(File srcFile, File destFile, boolean overlay) throws IOException {
boolean overlay) throws IOException {
// 判断源文件是否存在 // 判断源文件是否存在
if (!srcFile.exists()) { if (!srcFile.exists()) {
throw new IOException("Copy file failed, source file does not exists"); throw new IOException("Copy file failed, source file does not exists");