diff --git a/pom.xml b/pom.xml index 227be72..71ba746 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi file-chunk-combiner - 1.3.4 + 1.3.5 @@ -280,6 +280,12 @@ jaxb-impl 2.3.1 + + + com.google.guava + guava + 33.0.0-jre + diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java index 6955039..e56c018 100644 --- a/src/main/java/com/zdjizhi/FileChunkCombiner.java +++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java @@ -1,6 +1,6 @@ package com.zdjizhi; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.text.CharSequenceUtil; import com.zdjizhi.config.Configs; import com.zdjizhi.function.*; import com.zdjizhi.function.map.ParseMessagePackMapFunction; @@ -42,7 +42,7 @@ public class FileChunkCombiner { List> triggers = new ArrayList<>(); triggers.add(ProcessingTimeTrigger.create()); - if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) { + if (Boolean.TRUE.equals(configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER))) { triggers.add(LastChunkTrigger.create()); } Trigger trigger = MultipleTrigger.of(triggers); @@ -58,8 +58,9 @@ public class FileChunkCombiner { SingleOutputStreamOperator fileMetaProxySingleOutputStreamOperator; for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) { switch (sinkType) { + default: case "hos": - if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { + if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { windowStream .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos")) .name("Filter: Hos") @@ -75,7 +76,7 @@ public class FileChunkCombiner { } break; case "hbase": - if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { + if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { windowStream .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase")) .name("Filter: HBase") diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java index 25ba9aa..65f0784 100644 --- a/src/main/java/com/zdjizhi/config/Configs.java +++ b/src/main/java/com/zdjizhi/config/Configs.java @@ -91,6 +91,9 @@ public class Configs { public static final ConfigOption SINK_HOS_BATCH_INTERVAL_MS = ConfigOptions.key("sink.hos.batch.interval.ms") .intType() .defaultValue(0); + public static final ConfigOption SINK_HOS_HEALTH_CHECK_INTERVAL_MS = ConfigOptions.key("sink.hos.health.check.interval.ms") + .intType() + .defaultValue(60000); public static final ConfigOption SINK_HTTP_CLIENT_MAX_TOTAL = ConfigOptions.key("sink.http.client.max.total") .intType() diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java index 736758d..929af5c 100644 --- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java +++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java @@ -1,10 +1,10 @@ package com.zdjizhi.function; -import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.PrimitiveArrayUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.pojo.FileChunk; -import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MeterView; @@ -162,7 +162,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction 0) { + if (!waitingToCombineChunkList.isEmpty()) { FileChunk fileChunk = combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString()); if (fileChunk != null) { combinedFileChunkList.add(fileChunk); @@ -183,12 +183,12 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction { public transient Counter filterChunksCounter; public transient Counter emlChunksCounter; public transient Counter txtChunksCounter; - private JexlExpression jexlExpression; - private JexlContext jexlContext; + private transient JexlExpression jexlExpression; + private transient JexlContext jexlContext; public FileChunkFilterFunction(String filterExpression, String functionName) { this.filterExpression = filterExpression; @@ -48,7 +48,7 @@ public class FileChunkFilterFunction extends RichFilterFunction { filterChunksCounter.inc(); return false; } - if (StrUtil.isNotEmpty(filterExpression)) { + if (CharSequenceUtil.isNotEmpty(filterExpression)) { jexlContext.set("FileChunk", value); if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) { filterChunksCounter.inc(); diff --git a/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java index be556a4..66b9516 100644 --- a/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java +++ b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java @@ -225,9 +225,7 @@ public class ParseMessagePackMapFunction extends RichMapFunction metaMap.put(key, metaJsonObject.get(key))); fileChunk.setMeta(metaMap); break; default: diff --git a/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java index 8b7ce56..a3b1f16 100644 --- a/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java @@ -45,7 +45,7 @@ public class ParseProxyFileMetaFlatMapFunction extends RichFlatMapFunction 0) { + if (record.containsKey("proxy_rule_list") && !record.getJSONArray("proxy_rule_list").isEmpty()) { if (record.containsKey("http_request_body")) { FileChunk fileChunk = new FileChunk(); Map metaMap = new HashMap<>(); @@ -84,7 +84,7 @@ public class ParseProxyFileMetaFlatMapFunction extends RichFlatMapFunction metaMap) { - metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0); + metaMap.put("policyId", record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty() ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0); metaMap.put("serverIP", record.getString("server_ip")); metaMap.put("serverPort", record.getInteger("server_port")); metaMap.put("clientIP", record.getString("client_ip")); diff --git a/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java index cdbb131..cb68d93 100644 --- a/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java @@ -49,10 +49,8 @@ public class ParseSessionFileMetaFlatMapFunction extends RichFlatMapFunction 0 - || record.containsKey("monitor_rule_list") - && record.getJSONArray("monitor_rule_list").size() > 0) { + if (record.containsKey("security_rule_list") && !record.getJSONArray("security_rule_list").isEmpty() + || record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty()) { if (record.containsKey("http_request_body")) { FileChunk fileChunk = new FileChunk(); Map metaMap = new HashMap<>(); @@ -103,7 +101,7 @@ public class ParseSessionFileMetaFlatMapFunction extends RichFlatMapFunction metaMap) { - metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0); + metaMap.put("policyId", record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty() ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0); metaMap.put("serverIP", record.getString("server_ip")); metaMap.put("serverPort", record.getInteger("server_port")); metaMap.put("clientIP", record.getString("client_ip")); diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java index 3ba9d15..321fc00 100644 --- a/src/main/java/com/zdjizhi/sink/HBaseSink.java +++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java @@ -1,9 +1,10 @@ package com.zdjizhi.sink; import cn.hutool.core.io.IoUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.google.common.util.concurrent.RateLimiter; import com.zdjizhi.config.Configs; import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.utils.HBaseColumnConstants; @@ -63,27 +64,26 @@ public class HBaseSink extends RichSinkFunction { public transient Counter pcapngChunksCounter; public transient Counter mediaChunksCounter; private boolean isAsync; - private Connection syncHBaseConnection; - private AsyncConnection asyncHBaseConnection; - private Table table; - private Table indexTimeTable; - private Table indexFilenameTable; - private AsyncTable asyncTable; - private AsyncTable asyncIndexTimeTable; - private AsyncTable asyncIndexFilenameTable; - private List dataPutList; - private List indexTimePutList; - private List indexFilenamePutList; + private transient Connection syncHBaseConnection; + private transient AsyncConnection asyncHBaseConnection; + private transient Table table; + private transient Table indexTimeTable; + private transient Table indexFilenameTable; + private transient AsyncTable asyncTable; + private transient AsyncTable asyncIndexTimeTable; + private transient AsyncTable asyncIndexFilenameTable; + private transient List dataPutList; + private transient List indexTimePutList; + private transient List indexFilenamePutList; private long chunkSize; private long batchSize; private long batchInterval; - private ScheduledExecutorService executorService; + private transient ScheduledExecutorService executorService; private long rateLimitThreshold; private String rateLimitExpression; - private volatile long timestamp; - private long count; - private JexlExpression jexlExpression; - private JexlContext jexlContext; + private transient JexlExpression jexlExpression; + private transient JexlContext jexlContext; + private transient RateLimiter rateLimiter; public HBaseSink(Configuration configuration) { this.configuration = configuration; @@ -161,7 +161,6 @@ public class HBaseSink extends RichSinkFunction { indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET))); indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET))); } - timestamp = System.currentTimeMillis(); batchSize = configuration.getLong(Configs.SINK_HBASE_BATCH_SIZE); batchInterval = configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS); dataPutList = new ArrayList<>(); @@ -178,40 +177,30 @@ public class HBaseSink extends RichSinkFunction { } }, batchInterval, batchInterval, TimeUnit.MILLISECONDS); } + rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); if (rateLimitThreshold > 0) { - rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION); - count = 0; JexlEngine jexlEngine = new JexlBuilder().create(); jexlExpression = jexlEngine.createExpression(rateLimitExpression); jexlContext = new MapContext(); + rateLimiter = RateLimiter.create(rateLimitThreshold); } } @Override public void invoke(FileChunk fileChunk, Context context) { synchronized (this) { - long currentTimeMillis = System.currentTimeMillis(); chunksInCounter.inc(); bytesInCounter.inc(fileChunk.getLength()); if (rateLimitThreshold > 0) { - count++; - if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) { - if (checkFileChunk(fileChunk)) { - sendFileChunk(fileChunk); - } else { - rateLimitDropChunksCounter.inc(); - } - } else if (currentTimeMillis - timestamp >= 1000) { - if (checkFileChunk(fileChunk)) { - sendFileChunk(fileChunk); - } else { - rateLimitDropChunksCounter.inc(); - } - timestamp = currentTimeMillis; - count = 0; - } else { + if(rateLimiter.tryAcquire()){ sendFileChunk(fileChunk); + }else { + if (checkFileChunk(fileChunk)) { + sendFileChunk(fileChunk); + } else { + rateLimitDropChunksCounter.inc(); + } } } else { sendFileChunk(fileChunk); @@ -303,21 +292,21 @@ public class HBaseSink extends RichSinkFunction { return null; }); dataPutList.clear(); - if (indexTimePutList.size() > 0) { + if (!indexTimePutList.isEmpty()) { asyncIndexTimeTable.batch(indexTimePutList); indexTimePutList.clear(); } - if (indexFilenamePutList.size() > 0) { + if (!indexFilenamePutList.isEmpty()) { asyncIndexFilenameTable.batch(indexFilenamePutList); indexFilenamePutList.clear(); } } else { try { table.batch(dataPutList, null); - if (indexTimePutList.size() > 0) { + if (!indexTimePutList.isEmpty()) { indexTimeTable.batch(indexTimePutList, null); } - if (indexFilenamePutList.size() > 0) { + if (!indexFilenamePutList.isEmpty()) { indexFilenameTable.batch(indexFilenamePutList, null); } } catch (IOException | InterruptedException e) { @@ -334,7 +323,7 @@ public class HBaseSink extends RichSinkFunction { } private boolean checkFileChunk(FileChunk fileChunk) { - if (StrUtil.isNotEmpty(rateLimitExpression)) { + if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) { jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk); return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString()); } diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java index 06a18a3..59a3209 100644 --- a/src/main/java/com/zdjizhi/sink/HosSink.java +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -6,6 +6,7 @@ import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.core.util.*; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.google.common.util.concurrent.RateLimiter; import com.zdjizhi.config.Configs; import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.utils.HttpClientUtil; @@ -29,6 +30,7 @@ import org.apache.http.util.EntityUtils; import java.io.IOException; import java.io.InterruptedIOException; import java.net.SocketException; +import java.net.URI; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.Executors; @@ -71,13 +73,13 @@ public class HosSink extends RichSinkFunction { public transient Counter pcapngChunksCounter; public transient Counter mediaChunksCounter; private boolean isAsync; - private CloseableHttpClient syncHttpClient; - private CloseableHttpAsyncClient asyncHttpClient; + private transient CloseableHttpClient syncHttpClient; + private transient CloseableHttpAsyncClient asyncHttpClient; private int loadBalanceMode; - private List endpointList; - private volatile String endpoint; + private List healthyEndpoints; + private volatile String healthyEndpoint; private String token; - private volatile String bathPutUrl; + private String bathPutKey; private HashMap hosMessage; private String objectsMeta; private String objectsOffset; @@ -85,13 +87,12 @@ public class HosSink extends RichSinkFunction { private long batchSize; private long batchInterval; private long chunkSize; - private ScheduledExecutorService executorService; + private transient ScheduledExecutorService executorService; private long rateLimitThreshold; private String rateLimitExpression; - private volatile long timestamp; - private long count; - private JexlExpression jexlExpression; - private JexlContext jexlContext; + private transient JexlExpression jexlExpression; + private transient JexlContext jexlContext; + private transient RateLimiter rateLimiter; public HosSink(Configuration configuration) { this.configuration = configuration; @@ -157,13 +158,25 @@ public class HosSink extends RichSinkFunction { metricGroup.meter("numHtmlChunksOutPerSecond", new MeterView(htmlChunksCounter)); metricGroup.meter("numPcapngChunksOutPerSecond", new MeterView(pcapngChunksCounter)); metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter)); - endpointList = Arrays.asList(configuration.get(Configs.SINK_HOS_ENDPOINT).split(",")); - if (endpointList.size() == 1) { + executorService = Executors.newScheduledThreadPool(2); + String[] endpoints = configuration.get(Configs.SINK_HOS_ENDPOINT).split(","); + healthyEndpoints = new ArrayList<>(); + healthyEndpoints.addAll(Arrays.asList(endpoints)); + if (endpoints.length == 1) { loadBalanceMode = 0; - endpoint = endpointList.get(0); - } else { + healthyEndpoint = healthyEndpoints.get(0); + } else if (endpoints.length > 1) { loadBalanceMode = 1; - endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); + healthyEndpoint = RandomUtil.randomEle(healthyEndpoints); + executorService.scheduleWithFixedDelay(() -> { + for (String endpoint : endpoints) { + if (!PublicUtil.checkHealth(endpoint)) { + healthyEndpoints.remove(endpoint); + } else if (!healthyEndpoints.contains(endpoint)) { + healthyEndpoints.add(endpoint); + } + } + }, configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), TimeUnit.MILLISECONDS); } token = configuration.get(Configs.SINK_HOS_TOKEN); isAsync = configuration.getBoolean(Configs.SINK_ASYNC); @@ -173,17 +186,15 @@ public class HosSink extends RichSinkFunction { } else { syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient(); } - timestamp = System.currentTimeMillis(); batchSize = configuration.getLong(Configs.SINK_HOS_BATCH_SIZE); batchInterval = configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS); if (batchSize > 0 && batchInterval > 0) { - bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; + bathPutKey = PublicUtil.getUUID(); hosMessage = new HashMap<>(); byteList = new ArrayList<>(); objectsMeta = ""; objectsOffset = ""; chunkSize = 0; - executorService = Executors.newScheduledThreadPool(1); executorService.scheduleWithFixedDelay(() -> { synchronized (this) { if (!byteList.isEmpty()) { @@ -192,40 +203,39 @@ public class HosSink extends RichSinkFunction { } }, batchInterval, batchInterval, TimeUnit.MILLISECONDS); } + rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); if (rateLimitThreshold > 0) { - rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION); - count = 0; JexlEngine jexlEngine = new JexlBuilder().create(); jexlExpression = jexlEngine.createExpression(rateLimitExpression); jexlContext = new MapContext(); + rateLimiter = RateLimiter.create(rateLimitThreshold); } } @Override public void invoke(FileChunk fileChunk, Context context) throws RuntimeException { synchronized (this) { - long currentTimeMillis = System.currentTimeMillis(); + if (loadBalanceMode == 1) { + if (healthyEndpoints.isEmpty()) { + throw new RuntimeException("No healthy hos endpoints available"); + } else if (healthyEndpoints.size() == 1) { + healthyEndpoint = healthyEndpoints.get(0); + } else { + healthyEndpoint = RandomUtil.randomEle(healthyEndpoints); + } + } chunksInCounter.inc(); bytesInCounter.inc(fileChunk.getLength()); if (rateLimitThreshold > 0) { - count++; - if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) { - if (checkFileChunk(fileChunk)) { - sendFileChunk(fileChunk); - } else { - rateLimitDropChunksCounter.inc(); - } - } else if (currentTimeMillis - timestamp >= 1000) { - if (checkFileChunk(fileChunk)) { - sendFileChunk(fileChunk); - } else { - rateLimitDropChunksCounter.inc(); - } - timestamp = currentTimeMillis; - count = 0; - } else { + if (rateLimiter.tryAcquire()) { sendFileChunk(fileChunk); + } else { + if (checkFileChunk(fileChunk)) { + sendFileChunk(fileChunk); + } else { + rateLimitDropChunksCounter.inc(); + } } } else { sendFileChunk(fileChunk); @@ -261,9 +271,7 @@ public class HosSink extends RichSinkFunction { hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); Map metaMap = fileChunk.getMeta(); if (metaMap != null && !metaMap.isEmpty()) { - for (String meta : metaMap.keySet()) { - hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""); - } + metaMap.keySet().forEach(meta -> hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "")); } objectsMeta += hosMessage.toString() + ";"; hosMessage.clear(); @@ -277,18 +285,18 @@ public class HosSink extends RichSinkFunction { sendBatchData(); } } else { - String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid()); + String url = URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid()); HttpPut httpPut = new HttpPut(url); httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN)); httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode()); String filename = fileChunk.getFileName(); - if (StrUtil.isNotEmpty(filename) && filename.contains(".")) { + if (CharSequenceUtil.isNotEmpty(filename) && filename.contains(".")) { httpPut.setHeader(HOS_META_FILENAME, filename); - } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) { + } else if (CharSequenceUtil.isNotEmpty(filename) && !filename.contains(".")) { filename = filename + "." + fileChunk.getFileType(); httpPut.setHeader(HOS_META_FILENAME, filename); - } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) { + } else if (CharSequenceUtil.isEmpty(filename) && CharSequenceUtil.isNotEmpty(fileChunk.getFileType())) { httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType()); } if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { @@ -301,9 +309,7 @@ public class HosSink extends RichSinkFunction { httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); Map metaMap = fileChunk.getMeta(); if (metaMap != null && !metaMap.isEmpty()) { - for (String meta : metaMap.keySet()) { - httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""); - } + metaMap.keySet().forEach(meta -> httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "")); } httpPut.setEntity(new ByteArrayEntity(data)); executeRequest(httpPut); @@ -314,7 +320,7 @@ public class HosSink extends RichSinkFunction { } private void sendBatchData() { - HttpPut httpPut = new HttpPut(bathPutUrl); + HttpPut httpPut = new HttpPut(URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + bathPutKey) + "?multiFile"); httpPut.setHeader(TOKEN, token); httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); httpPut.setHeader(HOS_COMBINE_MODE, COMBINE_MODE_SEEK); @@ -333,37 +339,37 @@ public class HosSink extends RichSinkFunction { private void executeRequest(HttpPut httpPut) throws RuntimeException { if (isAsync) { - asyncHttpClient.execute(httpPut, new FutureCallback() { + asyncHttpClient.execute(httpPut, new FutureCallback<>() { @Override public void completed(HttpResponse httpResponse) { try { if (httpResponse.getStatusLine().getStatusCode() != 200) { String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8"); - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity); errorChunksCounter.inc(); } } catch (IOException e) { - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e); errorChunksCounter.inc(); } } @Override public void failed(Exception ex) { - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), ex); + LOG.error("put part to hos error. request failed. url: " + httpPut.getURI().toString(), ex); errorChunksCounter.inc(); if (ex instanceof IllegalStateException || ex instanceof IOReactorException) { throw new RuntimeException(ex); } if (loadBalanceMode == 1 && (ex instanceof SocketException || ex instanceof InterruptedIOException || ex instanceof UnknownHostException)) { - endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); - bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; + URI uri = httpPut.getURI(); + healthyEndpoints.remove(uri.getHost() + ":" + uri.getPort()); } } @Override public void cancelled() { - + LOG.error("put part to hos error. request cancelled. url: " + httpPut.getURI().toString()); } }); } else { @@ -372,14 +378,14 @@ public class HosSink extends RichSinkFunction { response = syncHttpClient.execute(httpPut); if (response.getStatusLine().getStatusCode() != 200) { String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8"); - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); errorChunksCounter.inc(); } } catch (IOException e) { - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e); errorChunksCounter.inc(); if (loadBalanceMode == 1 && (e instanceof SocketException || e instanceof InterruptedIOException || e instanceof UnknownHostException)) { - endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); + healthyEndpoints.remove(healthyEndpoint); } } finally { IoUtil.close(response); @@ -388,7 +394,7 @@ public class HosSink extends RichSinkFunction { } private boolean checkFileChunk(FileChunk fileChunk) { - if (StrUtil.isNotEmpty(rateLimitExpression)) { + if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) { jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk); return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString()); } diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java index 8aaecff..542bc26 100644 --- a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java +++ b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java @@ -71,10 +71,10 @@ public class OssSinkByCaffeineCache extends RichSinkFunction { public transient Counter between100KBAnd1MBTxtChunksCounter; public transient Counter greaterThan10MBTxtChunksCounter; private boolean isAsync; - private CloseableHttpClient syncHttpClient; - private CloseableHttpAsyncClient asyncHttpClient; + private transient CloseableHttpClient syncHttpClient; + private transient CloseableHttpAsyncClient asyncHttpClient; private List endpointList; - private Cache cache; + private transient Cache cache; public OssSinkByCaffeineCache(Configuration configuration) { this.configuration = configuration; @@ -242,9 +242,9 @@ public class OssSinkByCaffeineCache extends RichSinkFunction { calculateFileChunkMetrics(fileChunk, fileId); } - private void executeRequest(HttpPost httpPost, String url) throws RuntimeException{ + private void executeRequest(HttpPost httpPost, String url) throws RuntimeException { if (isAsync) { - asyncHttpClient.execute(httpPost, new FutureCallback() { + asyncHttpClient.execute(httpPost, new FutureCallback<>() { @Override public void completed(HttpResponse httpResponse) { try { @@ -275,7 +275,7 @@ public class OssSinkByCaffeineCache extends RichSinkFunction { @Override public void cancelled() { - + LOG.error("post file error. request cancelled. url: " + url); } }); } else { diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java index ef027bc..1ef2e7f 100644 --- a/src/main/java/com/zdjizhi/utils/PublicUtil.java +++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java @@ -1,7 +1,13 @@ package com.zdjizhi.utils; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.URLUtil; import cn.hutool.crypto.digest.DigestUtil; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.*; public class PublicUtil { @@ -19,4 +25,28 @@ public class PublicUtil { public static String getIndexDataHead(String filename) { return getRowKey(filename).substring(0, 1); } + + public static boolean checkHealth(String endpoint) { + boolean isHealth = false; + InputStream inputStream = null; + try { + URL url = new URL(URLUtil.normalize(endpoint + "/actuator/health")); + HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection(); + urlConnection.setRequestMethod("GET"); + urlConnection.setConnectTimeout(10000); + urlConnection.setReadTimeout(10000); + int responseCode = urlConnection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + inputStream = urlConnection.getInputStream(); + String responseBody = IoUtil.read(inputStream, StandardCharsets.UTF_8); + if (responseBody.contains("UP")) { + isHealth = true; + } + } + } catch (Exception ignored) { + } finally { + IoUtil.close(inputStream); + } + return isHealth; + } }