diff --git a/pom.xml b/pom.xml
index 0b01062..0d4947d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
file-chunk-combiner
- 1.3.2
+ 1.3.3
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java
index d34203f..dbdff68 100644
--- a/src/main/java/com/zdjizhi/sink/HosSink.java
+++ b/src/main/java/com/zdjizhi/sink/HosSink.java
@@ -1,6 +1,8 @@
package com.zdjizhi.sink;
import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.text.CharPool;
+import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.*;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -23,6 +25,7 @@ import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
@@ -200,7 +203,7 @@ public class HosSink extends RichSinkFunction {
}
@Override
- public void invoke(FileChunk fileChunk, Context context) {
+ public void invoke(FileChunk fileChunk, Context context) throws RuntimeException {
synchronized (this) {
long currentTimeMillis = System.currentTimeMillis();
chunksInCounter.inc();
@@ -240,78 +243,73 @@ public class HosSink extends RichSinkFunction {
}
private void sendFileChunk(FileChunk fileChunk) {
- try {
- byte[] data = "".getBytes();
- if (fileChunk.getChunk() != null) {
- data = fileChunk.getChunk();
- }
- long chunkLength = data.length;
- if (batchSize > 0 && batchInterval > 0) {
- hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
- hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
- if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
- hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
- hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
- } else {
- hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
- hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
- }
- hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
- Map metaMap = fileChunk.getMeta();
- if (metaMap != null && !metaMap.isEmpty()) {
- for (String meta : metaMap.keySet()) {
- hosMessage.put(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
- }
- }
- objectsMeta += hosMessage.toString() + ";";
- hosMessage.clear();
- objectsOffset += chunkLength + ";";
- byteList.add(data);
- chunkSize += chunkLength;
- chunksOutCounter.inc();
- bytesOutCounter.inc(chunkLength);
- calculateFileChunkMetrics(fileChunk);
- if (chunkSize >= batchSize) {
- sendBatchData();
- }
+ byte[] data = "".getBytes();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ }
+ long chunkLength = data.length;
+ if (batchSize > 0 && batchInterval > 0) {
+ hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
+ hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
+ hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
+ hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
} else {
- String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
- HttpPut httpPut = new HttpPut(url);
- httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
- httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
- httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
- String filename = fileChunk.getFileName();
- if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
- httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
- filename = filename + "." + fileChunk.getFileType();
- httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
- httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
- }
- if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
- httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
- httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
- } else {
- httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
- httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
- }
- httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
- Map metaMap = fileChunk.getMeta();
- if (metaMap != null && !metaMap.isEmpty()) {
- for (String meta : metaMap.keySet()) {
- httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
- }
- }
- httpPut.setEntity(new ByteArrayEntity(data));
- executeRequest(httpPut);
- chunksOutCounter.inc();
- bytesOutCounter.inc(chunkLength);
- calculateFileChunkMetrics(fileChunk);
+ hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
+ hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
}
- } catch (Exception e) {
- LOG.error("put part to hos error.", e);
- errorChunksCounter.inc();
+ hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ Map metaMap = fileChunk.getMeta();
+ if (metaMap != null && !metaMap.isEmpty()) {
+ for (String meta : metaMap.keySet()) {
+ hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
+ }
+ }
+ objectsMeta += hosMessage.toString() + ";";
+ hosMessage.clear();
+ objectsOffset += chunkLength + ";";
+ byteList.add(data);
+ chunkSize += chunkLength;
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(chunkLength);
+ calculateFileChunkMetrics(fileChunk);
+ if (chunkSize >= batchSize) {
+ sendBatchData();
+ }
+ } else {
+ String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
+ HttpPut httpPut = new HttpPut(url);
+ httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
+ httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
+ httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
+ String filename = fileChunk.getFileName();
+ if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
+ httpPut.setHeader(HOS_META_FILENAME, filename);
+ } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
+ filename = filename + "." + fileChunk.getFileType();
+ httpPut.setHeader(HOS_META_FILENAME, filename);
+ } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
+ httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
+ }
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
+ httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
+ httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
+ } else {
+ httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
+ httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
+ }
+ httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ Map metaMap = fileChunk.getMeta();
+ if (metaMap != null && !metaMap.isEmpty()) {
+ for (String meta : metaMap.keySet()) {
+ httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
+ }
+ }
+ httpPut.setEntity(new ByteArrayEntity(data));
+ executeRequest(httpPut);
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(chunkLength);
+ calculateFileChunkMetrics(fileChunk);
}
}
@@ -324,7 +322,7 @@ public class HosSink extends RichSinkFunction {
httpPut.setHeader(HOS_OBJECTS_OFFSET, objectsOffset);
byte[][] bytes = new byte[byteList.size()][];
byteList.toArray(bytes);
- byte[] newData = ArrayUtil.addAll(bytes);
+ byte[] newData = PrimitiveArrayUtil.addAll(bytes);
httpPut.setEntity(new ByteArrayEntity(newData));
executeRequest(httpPut);
objectsMeta = "";
@@ -333,7 +331,7 @@ public class HosSink extends RichSinkFunction {
chunkSize = 0;
}
- private void executeRequest(HttpPut httpPut) {
+ private void executeRequest(HttpPut httpPut) throws RuntimeException {
if (isAsync) {
asyncHttpClient.execute(httpPut, new FutureCallback() {
@Override
@@ -341,19 +339,22 @@ public class HosSink extends RichSinkFunction {
try {
if (httpResponse.getStatusLine().getStatusCode() != 200) {
String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
- LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
errorChunksCounter.inc();
}
} catch (IOException e) {
- LOG.error("put part to hos error.", e);
+ LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
errorChunksCounter.inc();
}
}
@Override
public void failed(Exception ex) {
- LOG.error("put part to hos error.", ex);
+ LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), ex);
errorChunksCounter.inc();
+ if (ex instanceof IllegalStateException || ex instanceof IOReactorException) {
+ throw new RuntimeException(ex);
+ }
if (loadBalanceMode == 1 && ex instanceof ConnectException) {
endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
@@ -371,11 +372,11 @@ public class HosSink extends RichSinkFunction {
response = syncHttpClient.execute(httpPut);
if (response.getStatusLine().getStatusCode() != 200) {
String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
- LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
errorChunksCounter.inc();
}
} catch (IOException e) {
- LOG.error("put part to hos error.", e);
+ LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
errorChunksCounter.inc();
if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) {
endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
index 0ef9785..8aaecff 100644
--- a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
+++ b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
@@ -22,6 +22,7 @@ import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
@@ -206,48 +207,42 @@ public class OssSinkByCaffeineCache extends RichSinkFunction {
}
private void sendFile(FileChunk fileChunk, Map metaMap) {
- String url = "";
- try {
- byte[] data;
- String fileType = fileChunk.getFileType();
- if (fileChunk.getChunk() != null) {
- data = fileChunk.getChunk();
- } else {
- data = "".getBytes();
- }
- String fileId = metaMap != null && metaMap.containsKey("fileId") ? metaMap.get("fileId").toString() : "";
- String policyId = metaMap != null && metaMap.containsKey("policyId") ? metaMap.get("policyId").toString() : "0";
- String serverIP = metaMap != null && metaMap.containsKey("serverIP") ? metaMap.get("serverIP").toString() : "";
- String serverPort = metaMap != null && metaMap.containsKey("serverPort") ? metaMap.get("serverPort").toString() : "";
- String clientIP = metaMap != null && metaMap.containsKey("clientIP") ? metaMap.get("clientIP").toString() : "";
- String clientPort = metaMap != null && metaMap.containsKey("clientPort") ? metaMap.get("clientPort").toString() : "";
- String domain = metaMap != null && metaMap.containsKey("httpHost") ? FormatUtils.getTopPrivateDomain(metaMap.get("httpHost").toString()) : "";
- String subscriberId = metaMap != null && metaMap.containsKey("subscriberId") ? metaMap.get("subscriberId").toString() : "";
- String foundTime = metaMap != null && metaMap.containsKey("foundTime") ? metaMap.get("foundTime").toString() : "0";
- url = URLUtil.normalize(endpointList.get(RandomUtil.randomInt(endpointList.size())) + "/v3/upload?" +
- "cfg_id=" + policyId +
- "&file_id=" + fileId +
- "&file_type=" + fileType +
- "&found_time=" + foundTime +
- "&s_ip=" + serverIP +
- "&s_port=" + serverPort +
- "&d_ip=" + clientIP +
- "&d_port=" + clientPort +
- "&domain=" + domain +
- "&account=" + subscriberId);
- HttpPost httpPost = new HttpPost(url);
- httpPost.setEntity(new ByteArrayEntity(data));
- executeRequest(httpPost, url);
- chunksOutCounter.inc();
- bytesOutCounter.inc(data.length);
- calculateFileChunkMetrics(fileChunk, fileId);
- } catch (Exception e) {
- LOG.error("post file error. current url: " + url, e);
- errorChunksCounter.inc();
+ byte[] data;
+ String fileType = fileChunk.getFileType();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ } else {
+ data = "".getBytes();
}
+ String fileId = metaMap != null && metaMap.containsKey("fileId") ? metaMap.get("fileId").toString() : "";
+ String policyId = metaMap != null && metaMap.containsKey("policyId") ? metaMap.get("policyId").toString() : "0";
+ String serverIP = metaMap != null && metaMap.containsKey("serverIP") ? metaMap.get("serverIP").toString() : "";
+ String serverPort = metaMap != null && metaMap.containsKey("serverPort") ? metaMap.get("serverPort").toString() : "";
+ String clientIP = metaMap != null && metaMap.containsKey("clientIP") ? metaMap.get("clientIP").toString() : "";
+ String clientPort = metaMap != null && metaMap.containsKey("clientPort") ? metaMap.get("clientPort").toString() : "";
+ String domain = metaMap != null && metaMap.containsKey("httpHost") ? FormatUtils.getTopPrivateDomain(metaMap.get("httpHost").toString()) : "";
+ String subscriberId = metaMap != null && metaMap.containsKey("subscriberId") ? metaMap.get("subscriberId").toString() : "";
+ String foundTime = metaMap != null && metaMap.containsKey("foundTime") ? metaMap.get("foundTime").toString() : "0";
+ String url = URLUtil.normalize(endpointList.get(RandomUtil.randomInt(endpointList.size())) + "/v3/upload?" +
+ "cfg_id=" + policyId +
+ "&file_id=" + fileId +
+ "&file_type=" + fileType +
+ "&found_time=" + foundTime +
+ "&s_ip=" + serverIP +
+ "&s_port=" + serverPort +
+ "&d_ip=" + clientIP +
+ "&d_port=" + clientPort +
+ "&domain=" + domain +
+ "&account=" + subscriberId);
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.setEntity(new ByteArrayEntity(data));
+ executeRequest(httpPost, url);
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(data.length);
+ calculateFileChunkMetrics(fileChunk, fileId);
}
- private void executeRequest(HttpPost httpPost, String url) {
+ private void executeRequest(HttpPost httpPost, String url) throws RuntimeException{
if (isAsync) {
asyncHttpClient.execute(httpPost, new FutureCallback() {
@Override
@@ -273,6 +268,9 @@ public class OssSinkByCaffeineCache extends RichSinkFunction {
public void failed(Exception ex) {
LOG.error("post file error. current url: " + url, ex);
errorChunksCounter.inc();
+ if (ex instanceof IllegalStateException || ex instanceof IOReactorException) {
+ throw new RuntimeException(ex);
+ }
}
@Override