From 94383ba3782b2cc3ecc58e48c818ec818bddf423 Mon Sep 17 00:00:00 2001 From: houjinchuan Date: Thu, 11 Jul 2024 10:07:05 +0800 Subject: [PATCH] =?UTF-8?q?[TSG-21698]=E4=BF=AE=E5=A4=8D=E4=BD=BF=E7=94=A8?= =?UTF-8?q?Async=20HttpClien=E5=8F=91=E7=94=9F=E5=A0=86=E5=A4=96=E5=86=85?= =?UTF-8?q?=E5=AD=98=E6=BA=A2=E5=87=BA=E6=97=B6=E4=BB=BB=E5=8A=A1=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E9=87=8D=E5=90=AF=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- src/main/java/com/zdjizhi/sink/HosSink.java | 157 +++++++++--------- .../zdjizhi/sink/OssSinkByCaffeineCache.java | 76 +++++---- 3 files changed, 117 insertions(+), 118 deletions(-) diff --git a/pom.xml b/pom.xml index 0b01062..0d4947d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi file-chunk-combiner - 1.3.2 + 1.3.3 diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java index d34203f..dbdff68 100644 --- a/src/main/java/com/zdjizhi/sink/HosSink.java +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -1,6 +1,8 @@ package com.zdjizhi.sink; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.text.CharPool; +import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.core.util.*; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -23,6 +25,7 @@ import org.apache.http.conn.HttpHostConnectException; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.util.EntityUtils; import java.io.IOException; @@ -200,7 +203,7 @@ public class HosSink extends RichSinkFunction { } @Override - public void invoke(FileChunk fileChunk, Context context) { + public void invoke(FileChunk fileChunk, Context context) throws RuntimeException { synchronized (this) { long currentTimeMillis = System.currentTimeMillis(); chunksInCounter.inc(); @@ -240,78 +243,73 @@ public class HosSink extends RichSinkFunction { } private void sendFileChunk(FileChunk fileChunk) { - try { - byte[] data = "".getBytes(); - if (fileChunk.getChunk() != null) { - data = fileChunk.getChunk(); - } - long chunkLength = data.length; - if (batchSize > 0 && batchInterval > 0) { - hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType()); - hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid()); - if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { - hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + ""); - hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + ""); - } else { - hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + ""); - hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers()); - } - hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); - Map metaMap = fileChunk.getMeta(); - if (metaMap != null && !metaMap.isEmpty()) { - for (String meta : metaMap.keySet()) { - hosMessage.put(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + ""); - } - } - objectsMeta += hosMessage.toString() + ";"; - hosMessage.clear(); - objectsOffset += chunkLength + ";"; - byteList.add(data); - chunkSize += chunkLength; - chunksOutCounter.inc(); - bytesOutCounter.inc(chunkLength); - calculateFileChunkMetrics(fileChunk); - if (chunkSize >= batchSize) { - sendBatchData(); - } + byte[] data = "".getBytes(); + if (fileChunk.getChunk() != null) { + data = fileChunk.getChunk(); + } + long chunkLength = data.length; + if (batchSize > 0 && batchInterval > 0) { + hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType()); + hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid()); + if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { + hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + ""); + hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + ""); } else { - String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid()); - HttpPut httpPut = new HttpPut(url); - httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN)); - httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); - httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode()); - String filename = fileChunk.getFileName(); - if (StrUtil.isNotEmpty(filename) && filename.contains(".")) { - httpPut.setHeader(HOS_META_FILENAME, filename); - } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) { - filename = filename + "." + fileChunk.getFileType(); - httpPut.setHeader(HOS_META_FILENAME, filename); - } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) { - httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType()); - } - if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { - httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + ""); - httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + ""); - } else { - httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + ""); - httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers()); - } - httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); - Map metaMap = fileChunk.getMeta(); - if (metaMap != null && !metaMap.isEmpty()) { - for (String meta : metaMap.keySet()) { - httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + ""); - } - } - httpPut.setEntity(new ByteArrayEntity(data)); - executeRequest(httpPut); - chunksOutCounter.inc(); - bytesOutCounter.inc(chunkLength); - calculateFileChunkMetrics(fileChunk); + hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + ""); + hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers()); } - } catch (Exception e) { - LOG.error("put part to hos error.", e); - errorChunksCounter.inc(); + hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); + Map metaMap = fileChunk.getMeta(); + if (metaMap != null && !metaMap.isEmpty()) { + for (String meta : metaMap.keySet()) { + hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""); + } + } + objectsMeta += hosMessage.toString() + ";"; + hosMessage.clear(); + objectsOffset += chunkLength + ";"; + byteList.add(data); + chunkSize += chunkLength; + chunksOutCounter.inc(); + bytesOutCounter.inc(chunkLength); + calculateFileChunkMetrics(fileChunk); + if (chunkSize >= batchSize) { + sendBatchData(); + } + } else { + String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid()); + HttpPut httpPut = new HttpPut(url); + httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN)); + httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); + httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode()); + String filename = fileChunk.getFileName(); + if (StrUtil.isNotEmpty(filename) && filename.contains(".")) { + httpPut.setHeader(HOS_META_FILENAME, filename); + } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) { + filename = filename + "." + fileChunk.getFileType(); + httpPut.setHeader(HOS_META_FILENAME, filename); + } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) { + httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType()); + } + if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { + httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + ""); + httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + ""); + } else { + httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + ""); + httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers()); + } + httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); + Map metaMap = fileChunk.getMeta(); + if (metaMap != null && !metaMap.isEmpty()) { + for (String meta : metaMap.keySet()) { + httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""); + } + } + httpPut.setEntity(new ByteArrayEntity(data)); + executeRequest(httpPut); + chunksOutCounter.inc(); + bytesOutCounter.inc(chunkLength); + calculateFileChunkMetrics(fileChunk); } } @@ -324,7 +322,7 @@ public class HosSink extends RichSinkFunction { httpPut.setHeader(HOS_OBJECTS_OFFSET, objectsOffset); byte[][] bytes = new byte[byteList.size()][]; byteList.toArray(bytes); - byte[] newData = ArrayUtil.addAll(bytes); + byte[] newData = PrimitiveArrayUtil.addAll(bytes); httpPut.setEntity(new ByteArrayEntity(newData)); executeRequest(httpPut); objectsMeta = ""; @@ -333,7 +331,7 @@ public class HosSink extends RichSinkFunction { chunkSize = 0; } - private void executeRequest(HttpPut httpPut) { + private void executeRequest(HttpPut httpPut) throws RuntimeException { if (isAsync) { asyncHttpClient.execute(httpPut, new FutureCallback() { @Override @@ -341,19 +339,22 @@ public class HosSink extends RichSinkFunction { try { if (httpResponse.getStatusLine().getStatusCode() != 200) { String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8"); - LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity); + LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity); errorChunksCounter.inc(); } } catch (IOException e) { - LOG.error("put part to hos error.", e); + LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e); errorChunksCounter.inc(); } } @Override public void failed(Exception ex) { - LOG.error("put part to hos error.", ex); + LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), ex); errorChunksCounter.inc(); + if (ex instanceof IllegalStateException || ex instanceof IOReactorException) { + throw new RuntimeException(ex); + } if (loadBalanceMode == 1 && ex instanceof ConnectException) { endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; @@ -371,11 +372,11 @@ public class HosSink extends RichSinkFunction { response = syncHttpClient.execute(httpPut); if (response.getStatusLine().getStatusCode() != 200) { String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8"); - LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); + LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); errorChunksCounter.inc(); } } catch (IOException e) { - LOG.error("put part to hos error.", e); + LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e); errorChunksCounter.inc(); if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) { endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java index 0ef9785..8aaecff 100644 --- a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java +++ b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java @@ -22,6 +22,7 @@ import org.apache.http.concurrent.FutureCallback; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.util.EntityUtils; import java.io.IOException; @@ -206,48 +207,42 @@ public class OssSinkByCaffeineCache extends RichSinkFunction { } private void sendFile(FileChunk fileChunk, Map metaMap) { - String url = ""; - try { - byte[] data; - String fileType = fileChunk.getFileType(); - if (fileChunk.getChunk() != null) { - data = fileChunk.getChunk(); - } else { - data = "".getBytes(); - } - String fileId = metaMap != null && metaMap.containsKey("fileId") ? metaMap.get("fileId").toString() : ""; - String policyId = metaMap != null && metaMap.containsKey("policyId") ? metaMap.get("policyId").toString() : "0"; - String serverIP = metaMap != null && metaMap.containsKey("serverIP") ? metaMap.get("serverIP").toString() : ""; - String serverPort = metaMap != null && metaMap.containsKey("serverPort") ? metaMap.get("serverPort").toString() : ""; - String clientIP = metaMap != null && metaMap.containsKey("clientIP") ? metaMap.get("clientIP").toString() : ""; - String clientPort = metaMap != null && metaMap.containsKey("clientPort") ? metaMap.get("clientPort").toString() : ""; - String domain = metaMap != null && metaMap.containsKey("httpHost") ? FormatUtils.getTopPrivateDomain(metaMap.get("httpHost").toString()) : ""; - String subscriberId = metaMap != null && metaMap.containsKey("subscriberId") ? metaMap.get("subscriberId").toString() : ""; - String foundTime = metaMap != null && metaMap.containsKey("foundTime") ? metaMap.get("foundTime").toString() : "0"; - url = URLUtil.normalize(endpointList.get(RandomUtil.randomInt(endpointList.size())) + "/v3/upload?" + - "cfg_id=" + policyId + - "&file_id=" + fileId + - "&file_type=" + fileType + - "&found_time=" + foundTime + - "&s_ip=" + serverIP + - "&s_port=" + serverPort + - "&d_ip=" + clientIP + - "&d_port=" + clientPort + - "&domain=" + domain + - "&account=" + subscriberId); - HttpPost httpPost = new HttpPost(url); - httpPost.setEntity(new ByteArrayEntity(data)); - executeRequest(httpPost, url); - chunksOutCounter.inc(); - bytesOutCounter.inc(data.length); - calculateFileChunkMetrics(fileChunk, fileId); - } catch (Exception e) { - LOG.error("post file error. current url: " + url, e); - errorChunksCounter.inc(); + byte[] data; + String fileType = fileChunk.getFileType(); + if (fileChunk.getChunk() != null) { + data = fileChunk.getChunk(); + } else { + data = "".getBytes(); } + String fileId = metaMap != null && metaMap.containsKey("fileId") ? metaMap.get("fileId").toString() : ""; + String policyId = metaMap != null && metaMap.containsKey("policyId") ? metaMap.get("policyId").toString() : "0"; + String serverIP = metaMap != null && metaMap.containsKey("serverIP") ? metaMap.get("serverIP").toString() : ""; + String serverPort = metaMap != null && metaMap.containsKey("serverPort") ? metaMap.get("serverPort").toString() : ""; + String clientIP = metaMap != null && metaMap.containsKey("clientIP") ? metaMap.get("clientIP").toString() : ""; + String clientPort = metaMap != null && metaMap.containsKey("clientPort") ? metaMap.get("clientPort").toString() : ""; + String domain = metaMap != null && metaMap.containsKey("httpHost") ? FormatUtils.getTopPrivateDomain(metaMap.get("httpHost").toString()) : ""; + String subscriberId = metaMap != null && metaMap.containsKey("subscriberId") ? metaMap.get("subscriberId").toString() : ""; + String foundTime = metaMap != null && metaMap.containsKey("foundTime") ? metaMap.get("foundTime").toString() : "0"; + String url = URLUtil.normalize(endpointList.get(RandomUtil.randomInt(endpointList.size())) + "/v3/upload?" + + "cfg_id=" + policyId + + "&file_id=" + fileId + + "&file_type=" + fileType + + "&found_time=" + foundTime + + "&s_ip=" + serverIP + + "&s_port=" + serverPort + + "&d_ip=" + clientIP + + "&d_port=" + clientPort + + "&domain=" + domain + + "&account=" + subscriberId); + HttpPost httpPost = new HttpPost(url); + httpPost.setEntity(new ByteArrayEntity(data)); + executeRequest(httpPost, url); + chunksOutCounter.inc(); + bytesOutCounter.inc(data.length); + calculateFileChunkMetrics(fileChunk, fileId); } - private void executeRequest(HttpPost httpPost, String url) { + private void executeRequest(HttpPost httpPost, String url) throws RuntimeException{ if (isAsync) { asyncHttpClient.execute(httpPost, new FutureCallback() { @Override @@ -273,6 +268,9 @@ public class OssSinkByCaffeineCache extends RichSinkFunction { public void failed(Exception ex) { LOG.error("post file error. current url: " + url, ex); errorChunksCounter.inc(); + if (ex instanceof IllegalStateException || ex instanceof IOReactorException) { + throw new RuntimeException(ex); + } } @Override