diff --git a/safe-file-roll-sink/pom.xml b/safe-file-roll-sink/pom.xml
index b8cd634..73cd273 100644
--- a/safe-file-roll-sink/pom.xml
+++ b/safe-file-roll-sink/pom.xml
@@ -4,9 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- cn.huyanping.flume
+ cn.flume
safe-roll-file-sink
- 0.2
+ 1.0
diff --git a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java
index cc26ff0..c63e9d3 100644
--- a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java
+++ b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafePathManager.java
@@ -14,6 +14,7 @@ public class SafePathManager {
private File baseDirectory;
private AtomicInteger fileIndex;
private String filePrefix = "";
+ private static final String IN_USE_SUFFIX = ".tmp";
private File currentFile;
@@ -24,7 +25,7 @@ public class SafePathManager {
//重新生成微秒级文件名
seriesTimestamp = System.currentTimeMillis();
fileIndex = new AtomicInteger();
- currentFile = new File(baseDirectory, filePrefix + fileIndex.incrementAndGet() + "-" + seriesTimestamp );
+ currentFile = new File(baseDirectory, filePrefix + fileIndex.incrementAndGet() + "-" + seriesTimestamp + IN_USE_SUFFIX );
return currentFile;
}
diff --git a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java
index 84145d6..a609205 100644
--- a/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java
+++ b/safe-file-roll-sink/src/main/java/cn/flume/sinks/SafeRollingFileSink.java
@@ -18,8 +18,7 @@ import java.util.concurrent.TimeUnit;
public class SafeRollingFileSink extends AbstractSink implements Configurable {
- private static final Logger logger = LoggerFactory
- .getLogger(SafeRollingFileSink.class);
+ private static final Logger logger = LoggerFactory.getLogger(SafeRollingFileSink.class);
private static final long defaultRollInterval = 30;
private static final int defaultBatchSize = 100;
@@ -92,35 +91,35 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
this.targetDirectory = new File(targetDirectory);
//检查目录权限
- if(!this.directory.exists()){
- if(!this.directory.mkdirs()){
+ if (!this.directory.exists()) {
+ if (!this.directory.mkdirs()) {
throw new IllegalArgumentException("sink.directory is not a directory");
}
- }else if(!this.directory.canWrite()){
+ } else if (!this.directory.canWrite()) {
throw new IllegalArgumentException("sink.directory can not write");
}
//检查目标目录权限
- if(!this.targetDirectory.exists()){
- if(!this.targetDirectory.mkdirs()){
+ if (!this.targetDirectory.exists()) {
+ if (!this.targetDirectory.mkdirs()) {
throw new IllegalArgumentException("sink.directory is not a directory");
}
- }else if(!this.targetDirectory.canWrite()){
+ } else if (!this.targetDirectory.canWrite()) {
throw new IllegalArgumentException("sink.directory can not write");
}
//配置文件复制
- if(copyDirectory.length()>0 && useCopy){
+ if (copyDirectory.length() > 0 && useCopy) {
String[] copyDirectories = copyDirectory.split(",");
this.copyDirectory = new File[copyDirectories.length];
- for(int i=0; i 0){
+ if (rollInterval > 0) {
rollService = Executors.newScheduledThreadPool(
1,
@@ -147,12 +146,12 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
"rollingFileSink-roller-" +
Thread.currentThread().getId() + "-%d").build());
- /*
- * Every N seconds, mark that it's time to rotate. We purposefully do NOT
- * touch anything other than the indicator flag to avoid error handling
- * issues (e.g. IO exceptions occuring in two different threads.
- * Resist the urge to actually perform rotation in a separate thread!
- */
+ /*
+ * Every N seconds, mark that it's time to rotate. We purposefully do NOT
+ * touch anything other than the indicator flag to avoid error handling
+ * issues (e.g. IO exceptions occuring in two different threads.
+ * Resist the urge to actually perform rotation in a separate thread!
+ */
rollService.scheduleAtFixedRate(new Runnable() {
public void run() {
@@ -162,7 +161,7 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
}
}, rollInterval, rollInterval, TimeUnit.SECONDS);
- } else{
+ } else {
logger.info("RollInterval is not valid, file rolling will not happen.");
}
logger.info("RollingFileSink {} started.", getName());
@@ -181,14 +180,14 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
outputStream.close();
sinkCounter.incrementConnectionClosedCount();
shouldRotate = false;
- if(useCopy){
- if(!copyLogFile(pathController.getCurrentFile())){
+ if (useCopy) {
+ if (!copyLogFile(pathController.getCurrentFile())) {
logger.error("Copy completed file failed");
throw new IOException("Copy completed file failed");
}
}
//文件名加后缀、移动文件
- if(!rename(pathController.getCurrentFile())){
+ if (!rename(pathController.getCurrentFile())) {
logger.error("Rename completed file failed");
throw new IOException("Rname completed file failed");
}
@@ -237,15 +236,15 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
eventAttemptCounter++;
serializer.write(event);
- /*
- * FIXME: Feature: Rotate on size and time by checking bytes written and
- * setting shouldRotate = true if we're past a threshold.
- */
+ /*
+ * FIXME: Feature: Rotate on size and time by checking bytes written and
+ * setting shouldRotate = true if we're past a threshold.
+ */
- /*
- * FIXME: Feature: Control flush interval based on time or number of
- * events. For now, we're super-conservative and flush on each write.
- */
+ /*
+ * FIXME: Feature: Control flush interval based on time or number of
+ * events. For now, we're super-conservative and flush on each write.
+ */
} else {
// No events found, request back-off semantics from runner
result = Status.BACKOFF;
@@ -280,14 +279,14 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
serializer.beforeClose();
outputStream.close();
sinkCounter.incrementConnectionClosedCount();
- if(useCopy){
- if(!copyLogFile(pathController.getCurrentFile())){
+ if (useCopy) {
+ if (!copyLogFile(pathController.getCurrentFile())) {
logger.error("Copy completed file failed");
throw new IOException("Copy completed file failed");
}
}
//文件名加后缀、移动文件
- if(!rename(pathController.getCurrentFile())){
+ if (!rename(pathController.getCurrentFile())) {
logger.error("Rename completed file failed");
throw new IOException("Ranme completed file failed");
}
@@ -299,7 +298,7 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
serializer = null;
}
}
- if(rollInterval > 0){
+ if (rollInterval > 0) {
rollService.shutdown();
while (!rollService.isTerminated()) {
@@ -338,26 +337,30 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
logger.info("Delete empty file{}", current.getName());
return current.delete();
}
- if(useFileSuffix && moveFile){
- return current.renameTo(new File(this.targetDirectory, current.getName() + fileSuffix));
- }else if(useFileSuffix){
- return current.renameTo(new File(this.directory, current.getName() + fileSuffix));
- }else if(moveFile){
- return current.renameTo(new File(this.targetDirectory, current.getName()));
- }else{
+ if (useFileSuffix && moveFile) {
+ return current.renameTo(new File(this.targetDirectory, removeFileSuffix(current) + fileSuffix));
+ } else if (useFileSuffix) {
+ return current.renameTo(new File(this.directory, removeFileSuffix(current) + fileSuffix));
+ } else if (moveFile) {
+ return current.renameTo(new File(this.targetDirectory, removeFileSuffix(current)));
+ } else {
return true;
}
}
+ private String removeFileSuffix(File current) {
+ return current.getName().substring(0, current.getName().lastIndexOf("."));
+ }
+
private boolean copyLogFile(File current) throws IOException {
if (current.length() == 0L) {
logger.info("Delete empty file{}", current.getName());
return current.delete();
}
- for(File targetDir : this.copyDirectory){
- File targetFile = new File(targetDir.getAbsolutePath(), current.getName() + fileSuffix);
+ for (File targetDir : this.copyDirectory) {
+ File targetFile = new File(targetDir.getAbsolutePath(), removeFileSuffix(current) + fileSuffix);
boolean copyResult = this.copyFile(current, targetFile, false);
- if(!copyResult) return false;
+ if (!copyResult) return false;
}
return true;
@@ -366,18 +369,12 @@ public class SafeRollingFileSink extends AbstractSink implements Configurable {
/**
* 复制单个文件
*
- * @param srcFile
- * 待复制的文件名
- * @param destFile
- * 目标文件名
- * @param overlay
- * 如果目标文件存在,是否覆盖
+ * @param srcFile 待复制的文件名
+ * @param destFile 目标文件名
+ * @param overlay 如果目标文件存在,是否覆盖
* @return 如果复制成功返回true,否则返回false
*/
- public boolean copyFile(File srcFile, File destFile,
- boolean overlay) throws IOException {
-
-
+ public boolean copyFile(File srcFile, File destFile, boolean overlay) throws IOException {
// 判断源文件是否存在
if (!srcFile.exists()) {
throw new IOException("Copy file failed, source file does not exists");