修复hos挂掉一台恢复后,hos sink负载不均衡的问题
This commit is contained in:
8
pom.xml
8
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>file-chunk-combiner</artifactId>
|
||||
<version>1.3.4</version>
|
||||
<version>1.3.5</version>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
@@ -280,6 +280,12 @@
|
||||
<artifactId>jaxb-impl</artifactId>
|
||||
<version>2.3.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>33.0.0-jre</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.zdjizhi;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.text.CharSequenceUtil;
|
||||
import com.zdjizhi.config.Configs;
|
||||
import com.zdjizhi.function.*;
|
||||
import com.zdjizhi.function.map.ParseMessagePackMapFunction;
|
||||
@@ -42,7 +42,7 @@ public class FileChunkCombiner {
|
||||
|
||||
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
|
||||
triggers.add(ProcessingTimeTrigger.create());
|
||||
if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
|
||||
if (Boolean.TRUE.equals(configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER))) {
|
||||
triggers.add(LastChunkTrigger.create());
|
||||
}
|
||||
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
|
||||
@@ -58,8 +58,9 @@ public class FileChunkCombiner {
|
||||
SingleOutputStreamOperator<FileChunk> fileMetaProxySingleOutputStreamOperator;
|
||||
for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) {
|
||||
switch (sinkType) {
|
||||
default:
|
||||
case "hos":
|
||||
if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
|
||||
if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
|
||||
windowStream
|
||||
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos"))
|
||||
.name("Filter: Hos")
|
||||
@@ -75,7 +76,7 @@ public class FileChunkCombiner {
|
||||
}
|
||||
break;
|
||||
case "hbase":
|
||||
if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
|
||||
if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
|
||||
windowStream
|
||||
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase"))
|
||||
.name("Filter: HBase")
|
||||
|
||||
@@ -91,6 +91,9 @@ public class Configs {
|
||||
public static final ConfigOption<Integer> SINK_HOS_BATCH_INTERVAL_MS = ConfigOptions.key("sink.hos.batch.interval.ms")
|
||||
.intType()
|
||||
.defaultValue(0);
|
||||
public static final ConfigOption<Integer> SINK_HOS_HEALTH_CHECK_INTERVAL_MS = ConfigOptions.key("sink.hos.health.check.interval.ms")
|
||||
.intType()
|
||||
.defaultValue(60000);
|
||||
|
||||
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_MAX_TOTAL = ConfigOptions.key("sink.http.client.max.total")
|
||||
.intType()
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package com.zdjizhi.function;
|
||||
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.PrimitiveArrayUtil;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.pojo.FileChunk;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.metrics.Counter;
|
||||
import org.apache.flink.metrics.MeterView;
|
||||
@@ -162,7 +162,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
|
||||
timestampAndSizes.append(originalFileChunk.getTimestamp()).append("-").append(chunk.length).append(";");
|
||||
}
|
||||
}
|
||||
if (waitingToCombineChunkList.size() > 0) {
|
||||
if (!waitingToCombineChunkList.isEmpty()) {
|
||||
FileChunk fileChunk = combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString());
|
||||
if (fileChunk != null) {
|
||||
combinedFileChunkList.add(fileChunk);
|
||||
@@ -183,12 +183,12 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
|
||||
fileChunk.setChunkCount(byteList.size());
|
||||
byte[][] bytes = new byte[byteList.size()][];
|
||||
byteList.toArray(bytes);
|
||||
byte[] newData = ArrayUtil.addAll(bytes);
|
||||
byte[] newData = PrimitiveArrayUtil.addAll(bytes);
|
||||
if (COMBINE_MODE_SEEK.equals(combineMode)) {
|
||||
fileChunk.setOffset(offset);
|
||||
fileChunk.setLastChunkFlag(lastChunkFlag);
|
||||
} else {
|
||||
if (StringUtil.isNotEmpty(chunkNumbers)) {
|
||||
if (StringUtils.isNotEmpty(chunkNumbers)) {
|
||||
fileChunk.setChunkNumbers(chunkNumbers);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.zdjizhi.function;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.text.CharSequenceUtil;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.pojo.FileChunk;
|
||||
@@ -19,8 +19,8 @@ public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
|
||||
public transient Counter filterChunksCounter;
|
||||
public transient Counter emlChunksCounter;
|
||||
public transient Counter txtChunksCounter;
|
||||
private JexlExpression jexlExpression;
|
||||
private JexlContext jexlContext;
|
||||
private transient JexlExpression jexlExpression;
|
||||
private transient JexlContext jexlContext;
|
||||
|
||||
public FileChunkFilterFunction(String filterExpression, String functionName) {
|
||||
this.filterExpression = filterExpression;
|
||||
@@ -48,7 +48,7 @@ public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
|
||||
filterChunksCounter.inc();
|
||||
return false;
|
||||
}
|
||||
if (StrUtil.isNotEmpty(filterExpression)) {
|
||||
if (CharSequenceUtil.isNotEmpty(filterExpression)) {
|
||||
jexlContext.set("FileChunk", value);
|
||||
if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
|
||||
filterChunksCounter.inc();
|
||||
|
||||
@@ -225,9 +225,7 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
|
||||
case "meta":
|
||||
String meta = messageUnpacker.unpackString();
|
||||
JSONObject metaJsonObject = JSONUtil.parseObj(meta);
|
||||
for (String key : metaJsonObject.keySet()) {
|
||||
metaMap.put(key, metaJsonObject.get(key));
|
||||
}
|
||||
metaJsonObject.keySet().forEach(key -> metaMap.put(key, metaJsonObject.get(key)));
|
||||
fileChunk.setMeta(metaMap);
|
||||
break;
|
||||
default:
|
||||
|
||||
@@ -45,7 +45,7 @@ public class ParseProxyFileMetaFlatMapFunction extends RichFlatMapFunction<Strin
|
||||
try {
|
||||
chunksInCounter.inc();
|
||||
JSONObject record = JSONObject.parseObject(value);
|
||||
if (record.containsKey("proxy_rule_list") && record.getJSONArray("proxy_rule_list").size() > 0) {
|
||||
if (record.containsKey("proxy_rule_list") && !record.getJSONArray("proxy_rule_list").isEmpty()) {
|
||||
if (record.containsKey("http_request_body")) {
|
||||
FileChunk fileChunk = new FileChunk();
|
||||
Map<String, Object> metaMap = new HashMap<>();
|
||||
@@ -84,7 +84,7 @@ public class ParseProxyFileMetaFlatMapFunction extends RichFlatMapFunction<Strin
|
||||
}
|
||||
|
||||
private void getFileMeta(JSONObject record, Map<String, Object> metaMap) {
|
||||
metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
|
||||
metaMap.put("policyId", record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty() ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
|
||||
metaMap.put("serverIP", record.getString("server_ip"));
|
||||
metaMap.put("serverPort", record.getInteger("server_port"));
|
||||
metaMap.put("clientIP", record.getString("client_ip"));
|
||||
|
||||
@@ -49,10 +49,8 @@ public class ParseSessionFileMetaFlatMapFunction extends RichFlatMapFunction<Str
|
||||
try {
|
||||
chunksInCounter.inc();
|
||||
JSONObject record = JSONObject.parseObject(value);
|
||||
if (record.containsKey("security_rule_list")
|
||||
&& record.getJSONArray("security_rule_list").size() > 0
|
||||
|| record.containsKey("monitor_rule_list")
|
||||
&& record.getJSONArray("monitor_rule_list").size() > 0) {
|
||||
if (record.containsKey("security_rule_list") && !record.getJSONArray("security_rule_list").isEmpty()
|
||||
|| record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty()) {
|
||||
if (record.containsKey("http_request_body")) {
|
||||
FileChunk fileChunk = new FileChunk();
|
||||
Map<String, Object> metaMap = new HashMap<>();
|
||||
@@ -103,7 +101,7 @@ public class ParseSessionFileMetaFlatMapFunction extends RichFlatMapFunction<Str
|
||||
}
|
||||
|
||||
private void getFileMeta(JSONObject record, Map<String, Object> metaMap) {
|
||||
metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
|
||||
metaMap.put("policyId", record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty() ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
|
||||
metaMap.put("serverIP", record.getString("server_ip"));
|
||||
metaMap.put("serverPort", record.getInteger("server_port"));
|
||||
metaMap.put("clientIP", record.getString("client_ip"));
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
package com.zdjizhi.sink;
|
||||
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.text.CharSequenceUtil;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.zdjizhi.config.Configs;
|
||||
import com.zdjizhi.pojo.FileChunk;
|
||||
import com.zdjizhi.utils.HBaseColumnConstants;
|
||||
@@ -63,27 +64,26 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
||||
public transient Counter pcapngChunksCounter;
|
||||
public transient Counter mediaChunksCounter;
|
||||
private boolean isAsync;
|
||||
private Connection syncHBaseConnection;
|
||||
private AsyncConnection asyncHBaseConnection;
|
||||
private Table table;
|
||||
private Table indexTimeTable;
|
||||
private Table indexFilenameTable;
|
||||
private AsyncTable<AdvancedScanResultConsumer> asyncTable;
|
||||
private AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable;
|
||||
private AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable;
|
||||
private List<Put> dataPutList;
|
||||
private List<Put> indexTimePutList;
|
||||
private List<Put> indexFilenamePutList;
|
||||
private transient Connection syncHBaseConnection;
|
||||
private transient AsyncConnection asyncHBaseConnection;
|
||||
private transient Table table;
|
||||
private transient Table indexTimeTable;
|
||||
private transient Table indexFilenameTable;
|
||||
private transient AsyncTable<AdvancedScanResultConsumer> asyncTable;
|
||||
private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable;
|
||||
private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable;
|
||||
private transient List<Put> dataPutList;
|
||||
private transient List<Put> indexTimePutList;
|
||||
private transient List<Put> indexFilenamePutList;
|
||||
private long chunkSize;
|
||||
private long batchSize;
|
||||
private long batchInterval;
|
||||
private ScheduledExecutorService executorService;
|
||||
private transient ScheduledExecutorService executorService;
|
||||
private long rateLimitThreshold;
|
||||
private String rateLimitExpression;
|
||||
private volatile long timestamp;
|
||||
private long count;
|
||||
private JexlExpression jexlExpression;
|
||||
private JexlContext jexlContext;
|
||||
private transient JexlExpression jexlExpression;
|
||||
private transient JexlContext jexlContext;
|
||||
private transient RateLimiter rateLimiter;
|
||||
|
||||
public HBaseSink(Configuration configuration) {
|
||||
this.configuration = configuration;
|
||||
@@ -161,7 +161,6 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
||||
indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
|
||||
indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
|
||||
}
|
||||
timestamp = System.currentTimeMillis();
|
||||
batchSize = configuration.getLong(Configs.SINK_HBASE_BATCH_SIZE);
|
||||
batchInterval = configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS);
|
||||
dataPutList = new ArrayList<>();
|
||||
@@ -178,40 +177,30 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
||||
}
|
||||
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
|
||||
if (rateLimitThreshold > 0) {
|
||||
rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
|
||||
rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
|
||||
count = 0;
|
||||
JexlEngine jexlEngine = new JexlBuilder().create();
|
||||
jexlExpression = jexlEngine.createExpression(rateLimitExpression);
|
||||
jexlContext = new MapContext();
|
||||
rateLimiter = RateLimiter.create(rateLimitThreshold);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(FileChunk fileChunk, Context context) {
|
||||
synchronized (this) {
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
chunksInCounter.inc();
|
||||
bytesInCounter.inc(fileChunk.getLength());
|
||||
if (rateLimitThreshold > 0) {
|
||||
count++;
|
||||
if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) {
|
||||
if (checkFileChunk(fileChunk)) {
|
||||
sendFileChunk(fileChunk);
|
||||
} else {
|
||||
rateLimitDropChunksCounter.inc();
|
||||
}
|
||||
} else if (currentTimeMillis - timestamp >= 1000) {
|
||||
if (checkFileChunk(fileChunk)) {
|
||||
sendFileChunk(fileChunk);
|
||||
} else {
|
||||
rateLimitDropChunksCounter.inc();
|
||||
}
|
||||
timestamp = currentTimeMillis;
|
||||
count = 0;
|
||||
} else {
|
||||
if(rateLimiter.tryAcquire()){
|
||||
sendFileChunk(fileChunk);
|
||||
}else {
|
||||
if (checkFileChunk(fileChunk)) {
|
||||
sendFileChunk(fileChunk);
|
||||
} else {
|
||||
rateLimitDropChunksCounter.inc();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sendFileChunk(fileChunk);
|
||||
@@ -303,21 +292,21 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
||||
return null;
|
||||
});
|
||||
dataPutList.clear();
|
||||
if (indexTimePutList.size() > 0) {
|
||||
if (!indexTimePutList.isEmpty()) {
|
||||
asyncIndexTimeTable.batch(indexTimePutList);
|
||||
indexTimePutList.clear();
|
||||
}
|
||||
if (indexFilenamePutList.size() > 0) {
|
||||
if (!indexFilenamePutList.isEmpty()) {
|
||||
asyncIndexFilenameTable.batch(indexFilenamePutList);
|
||||
indexFilenamePutList.clear();
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
table.batch(dataPutList, null);
|
||||
if (indexTimePutList.size() > 0) {
|
||||
if (!indexTimePutList.isEmpty()) {
|
||||
indexTimeTable.batch(indexTimePutList, null);
|
||||
}
|
||||
if (indexFilenamePutList.size() > 0) {
|
||||
if (!indexFilenamePutList.isEmpty()) {
|
||||
indexFilenameTable.batch(indexFilenamePutList, null);
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
@@ -334,7 +323,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
||||
}
|
||||
|
||||
private boolean checkFileChunk(FileChunk fileChunk) {
|
||||
if (StrUtil.isNotEmpty(rateLimitExpression)) {
|
||||
if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) {
|
||||
jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
|
||||
return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import cn.hutool.core.text.CharSequenceUtil;
|
||||
import cn.hutool.core.util.*;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.zdjizhi.config.Configs;
|
||||
import com.zdjizhi.pojo.FileChunk;
|
||||
import com.zdjizhi.utils.HttpClientUtil;
|
||||
@@ -29,6 +30,7 @@ import org.apache.http.util.EntityUtils;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.SocketException;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -71,13 +73,13 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
public transient Counter pcapngChunksCounter;
|
||||
public transient Counter mediaChunksCounter;
|
||||
private boolean isAsync;
|
||||
private CloseableHttpClient syncHttpClient;
|
||||
private CloseableHttpAsyncClient asyncHttpClient;
|
||||
private transient CloseableHttpClient syncHttpClient;
|
||||
private transient CloseableHttpAsyncClient asyncHttpClient;
|
||||
private int loadBalanceMode;
|
||||
private List<String> endpointList;
|
||||
private volatile String endpoint;
|
||||
private List<String> healthyEndpoints;
|
||||
private volatile String healthyEndpoint;
|
||||
private String token;
|
||||
private volatile String bathPutUrl;
|
||||
private String bathPutKey;
|
||||
private HashMap<String, String> hosMessage;
|
||||
private String objectsMeta;
|
||||
private String objectsOffset;
|
||||
@@ -85,13 +87,12 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
private long batchSize;
|
||||
private long batchInterval;
|
||||
private long chunkSize;
|
||||
private ScheduledExecutorService executorService;
|
||||
private transient ScheduledExecutorService executorService;
|
||||
private long rateLimitThreshold;
|
||||
private String rateLimitExpression;
|
||||
private volatile long timestamp;
|
||||
private long count;
|
||||
private JexlExpression jexlExpression;
|
||||
private JexlContext jexlContext;
|
||||
private transient JexlExpression jexlExpression;
|
||||
private transient JexlContext jexlContext;
|
||||
private transient RateLimiter rateLimiter;
|
||||
|
||||
public HosSink(Configuration configuration) {
|
||||
this.configuration = configuration;
|
||||
@@ -157,13 +158,25 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
metricGroup.meter("numHtmlChunksOutPerSecond", new MeterView(htmlChunksCounter));
|
||||
metricGroup.meter("numPcapngChunksOutPerSecond", new MeterView(pcapngChunksCounter));
|
||||
metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter));
|
||||
endpointList = Arrays.asList(configuration.get(Configs.SINK_HOS_ENDPOINT).split(","));
|
||||
if (endpointList.size() == 1) {
|
||||
executorService = Executors.newScheduledThreadPool(2);
|
||||
String[] endpoints = configuration.get(Configs.SINK_HOS_ENDPOINT).split(",");
|
||||
healthyEndpoints = new ArrayList<>();
|
||||
healthyEndpoints.addAll(Arrays.asList(endpoints));
|
||||
if (endpoints.length == 1) {
|
||||
loadBalanceMode = 0;
|
||||
endpoint = endpointList.get(0);
|
||||
} else {
|
||||
healthyEndpoint = healthyEndpoints.get(0);
|
||||
} else if (endpoints.length > 1) {
|
||||
loadBalanceMode = 1;
|
||||
endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
|
||||
healthyEndpoint = RandomUtil.randomEle(healthyEndpoints);
|
||||
executorService.scheduleWithFixedDelay(() -> {
|
||||
for (String endpoint : endpoints) {
|
||||
if (!PublicUtil.checkHealth(endpoint)) {
|
||||
healthyEndpoints.remove(endpoint);
|
||||
} else if (!healthyEndpoints.contains(endpoint)) {
|
||||
healthyEndpoints.add(endpoint);
|
||||
}
|
||||
}
|
||||
}, configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
token = configuration.get(Configs.SINK_HOS_TOKEN);
|
||||
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
|
||||
@@ -173,17 +186,15 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
} else {
|
||||
syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
|
||||
}
|
||||
timestamp = System.currentTimeMillis();
|
||||
batchSize = configuration.getLong(Configs.SINK_HOS_BATCH_SIZE);
|
||||
batchInterval = configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS);
|
||||
if (batchSize > 0 && batchInterval > 0) {
|
||||
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
|
||||
bathPutKey = PublicUtil.getUUID();
|
||||
hosMessage = new HashMap<>();
|
||||
byteList = new ArrayList<>();
|
||||
objectsMeta = "";
|
||||
objectsOffset = "";
|
||||
chunkSize = 0;
|
||||
executorService = Executors.newScheduledThreadPool(1);
|
||||
executorService.scheduleWithFixedDelay(() -> {
|
||||
synchronized (this) {
|
||||
if (!byteList.isEmpty()) {
|
||||
@@ -192,40 +203,39 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
}
|
||||
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
|
||||
if (rateLimitThreshold > 0) {
|
||||
rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
|
||||
rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
|
||||
count = 0;
|
||||
JexlEngine jexlEngine = new JexlBuilder().create();
|
||||
jexlExpression = jexlEngine.createExpression(rateLimitExpression);
|
||||
jexlContext = new MapContext();
|
||||
rateLimiter = RateLimiter.create(rateLimitThreshold);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(FileChunk fileChunk, Context context) throws RuntimeException {
|
||||
synchronized (this) {
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
if (loadBalanceMode == 1) {
|
||||
if (healthyEndpoints.isEmpty()) {
|
||||
throw new RuntimeException("No healthy hos endpoints available");
|
||||
} else if (healthyEndpoints.size() == 1) {
|
||||
healthyEndpoint = healthyEndpoints.get(0);
|
||||
} else {
|
||||
healthyEndpoint = RandomUtil.randomEle(healthyEndpoints);
|
||||
}
|
||||
}
|
||||
chunksInCounter.inc();
|
||||
bytesInCounter.inc(fileChunk.getLength());
|
||||
if (rateLimitThreshold > 0) {
|
||||
count++;
|
||||
if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) {
|
||||
if (checkFileChunk(fileChunk)) {
|
||||
sendFileChunk(fileChunk);
|
||||
} else {
|
||||
rateLimitDropChunksCounter.inc();
|
||||
}
|
||||
} else if (currentTimeMillis - timestamp >= 1000) {
|
||||
if (checkFileChunk(fileChunk)) {
|
||||
sendFileChunk(fileChunk);
|
||||
} else {
|
||||
rateLimitDropChunksCounter.inc();
|
||||
}
|
||||
timestamp = currentTimeMillis;
|
||||
count = 0;
|
||||
} else {
|
||||
if (rateLimiter.tryAcquire()) {
|
||||
sendFileChunk(fileChunk);
|
||||
} else {
|
||||
if (checkFileChunk(fileChunk)) {
|
||||
sendFileChunk(fileChunk);
|
||||
} else {
|
||||
rateLimitDropChunksCounter.inc();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sendFileChunk(fileChunk);
|
||||
@@ -261,9 +271,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
|
||||
Map<String, Object> metaMap = fileChunk.getMeta();
|
||||
if (metaMap != null && !metaMap.isEmpty()) {
|
||||
for (String meta : metaMap.keySet()) {
|
||||
hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
|
||||
}
|
||||
metaMap.keySet().forEach(meta -> hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""));
|
||||
}
|
||||
objectsMeta += hosMessage.toString() + ";";
|
||||
hosMessage.clear();
|
||||
@@ -277,18 +285,18 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
sendBatchData();
|
||||
}
|
||||
} else {
|
||||
String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
|
||||
String url = URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
|
||||
HttpPut httpPut = new HttpPut(url);
|
||||
httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
|
||||
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
|
||||
httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
|
||||
String filename = fileChunk.getFileName();
|
||||
if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
|
||||
if (CharSequenceUtil.isNotEmpty(filename) && filename.contains(".")) {
|
||||
httpPut.setHeader(HOS_META_FILENAME, filename);
|
||||
} else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
|
||||
} else if (CharSequenceUtil.isNotEmpty(filename) && !filename.contains(".")) {
|
||||
filename = filename + "." + fileChunk.getFileType();
|
||||
httpPut.setHeader(HOS_META_FILENAME, filename);
|
||||
} else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
|
||||
} else if (CharSequenceUtil.isEmpty(filename) && CharSequenceUtil.isNotEmpty(fileChunk.getFileType())) {
|
||||
httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
|
||||
}
|
||||
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
|
||||
@@ -301,9 +309,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
|
||||
Map<String, Object> metaMap = fileChunk.getMeta();
|
||||
if (metaMap != null && !metaMap.isEmpty()) {
|
||||
for (String meta : metaMap.keySet()) {
|
||||
httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
|
||||
}
|
||||
metaMap.keySet().forEach(meta -> httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""));
|
||||
}
|
||||
httpPut.setEntity(new ByteArrayEntity(data));
|
||||
executeRequest(httpPut);
|
||||
@@ -314,7 +320,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
}
|
||||
|
||||
private void sendBatchData() {
|
||||
HttpPut httpPut = new HttpPut(bathPutUrl);
|
||||
HttpPut httpPut = new HttpPut(URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + bathPutKey) + "?multiFile");
|
||||
httpPut.setHeader(TOKEN, token);
|
||||
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
|
||||
httpPut.setHeader(HOS_COMBINE_MODE, COMBINE_MODE_SEEK);
|
||||
@@ -333,37 +339,37 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
|
||||
private void executeRequest(HttpPut httpPut) throws RuntimeException {
|
||||
if (isAsync) {
|
||||
asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
|
||||
asyncHttpClient.execute(httpPut, new FutureCallback<>() {
|
||||
@Override
|
||||
public void completed(HttpResponse httpResponse) {
|
||||
try {
|
||||
if (httpResponse.getStatusLine().getStatusCode() != 200) {
|
||||
String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
|
||||
LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
||||
LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
||||
errorChunksCounter.inc();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
|
||||
LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e);
|
||||
errorChunksCounter.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception ex) {
|
||||
LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), ex);
|
||||
LOG.error("put part to hos error. request failed. url: " + httpPut.getURI().toString(), ex);
|
||||
errorChunksCounter.inc();
|
||||
if (ex instanceof IllegalStateException || ex instanceof IOReactorException) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
if (loadBalanceMode == 1 && (ex instanceof SocketException || ex instanceof InterruptedIOException || ex instanceof UnknownHostException)) {
|
||||
endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
|
||||
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
|
||||
URI uri = httpPut.getURI();
|
||||
healthyEndpoints.remove(uri.getHost() + ":" + uri.getPort());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
|
||||
LOG.error("put part to hos error. request cancelled. url: " + httpPut.getURI().toString());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
@@ -372,14 +378,14 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
response = syncHttpClient.execute(httpPut);
|
||||
if (response.getStatusLine().getStatusCode() != 200) {
|
||||
String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
|
||||
LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
||||
LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
||||
errorChunksCounter.inc();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
|
||||
LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e);
|
||||
errorChunksCounter.inc();
|
||||
if (loadBalanceMode == 1 && (e instanceof SocketException || e instanceof InterruptedIOException || e instanceof UnknownHostException)) {
|
||||
endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
|
||||
healthyEndpoints.remove(healthyEndpoint);
|
||||
}
|
||||
} finally {
|
||||
IoUtil.close(response);
|
||||
@@ -388,7 +394,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
}
|
||||
|
||||
private boolean checkFileChunk(FileChunk fileChunk) {
|
||||
if (StrUtil.isNotEmpty(rateLimitExpression)) {
|
||||
if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) {
|
||||
jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
|
||||
return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
|
||||
}
|
||||
|
||||
@@ -71,10 +71,10 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
|
||||
public transient Counter between100KBAnd1MBTxtChunksCounter;
|
||||
public transient Counter greaterThan10MBTxtChunksCounter;
|
||||
private boolean isAsync;
|
||||
private CloseableHttpClient syncHttpClient;
|
||||
private CloseableHttpAsyncClient asyncHttpClient;
|
||||
private transient CloseableHttpClient syncHttpClient;
|
||||
private transient CloseableHttpAsyncClient asyncHttpClient;
|
||||
private List<String> endpointList;
|
||||
private Cache<String, FileChunk> cache;
|
||||
private transient Cache<String, FileChunk> cache;
|
||||
|
||||
public OssSinkByCaffeineCache(Configuration configuration) {
|
||||
this.configuration = configuration;
|
||||
@@ -242,9 +242,9 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
|
||||
calculateFileChunkMetrics(fileChunk, fileId);
|
||||
}
|
||||
|
||||
private void executeRequest(HttpPost httpPost, String url) throws RuntimeException{
|
||||
private void executeRequest(HttpPost httpPost, String url) throws RuntimeException {
|
||||
if (isAsync) {
|
||||
asyncHttpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
|
||||
asyncHttpClient.execute(httpPost, new FutureCallback<>() {
|
||||
@Override
|
||||
public void completed(HttpResponse httpResponse) {
|
||||
try {
|
||||
@@ -275,7 +275,7 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
|
||||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
|
||||
LOG.error("post file error. request cancelled. url: " + url);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
package com.zdjizhi.utils;
|
||||
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.util.URLUtil;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
|
||||
public class PublicUtil {
|
||||
@@ -19,4 +25,28 @@ public class PublicUtil {
|
||||
public static String getIndexDataHead(String filename) {
|
||||
return getRowKey(filename).substring(0, 1);
|
||||
}
|
||||
|
||||
public static boolean checkHealth(String endpoint) {
|
||||
boolean isHealth = false;
|
||||
InputStream inputStream = null;
|
||||
try {
|
||||
URL url = new URL(URLUtil.normalize(endpoint + "/actuator/health"));
|
||||
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
|
||||
urlConnection.setRequestMethod("GET");
|
||||
urlConnection.setConnectTimeout(10000);
|
||||
urlConnection.setReadTimeout(10000);
|
||||
int responseCode = urlConnection.getResponseCode();
|
||||
if (responseCode == HttpURLConnection.HTTP_OK) {
|
||||
inputStream = urlConnection.getInputStream();
|
||||
String responseBody = IoUtil.read(inputStream, StandardCharsets.UTF_8);
|
||||
if (responseBody.contains("UP")) {
|
||||
isHealth = true;
|
||||
}
|
||||
}
|
||||
} catch (Exception ignored) {
|
||||
} finally {
|
||||
IoUtil.close(inputStream);
|
||||
}
|
||||
return isHealth;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user