2024-01-22 17:33:39 +08:00
|
|
|
package com.zdjizhi.sink;
|
|
|
|
|
|
2024-02-29 19:03:07 +08:00
|
|
|
import cn.hutool.core.io.IoUtil;
|
|
|
|
|
import cn.hutool.core.util.*;
|
|
|
|
|
import cn.hutool.log.Log;
|
|
|
|
|
import cn.hutool.log.LogFactory;
|
|
|
|
|
import com.zdjizhi.config.Configs;
|
2024-01-22 17:33:39 +08:00
|
|
|
import com.zdjizhi.pojo.FileChunk;
|
|
|
|
|
import com.zdjizhi.utils.HttpClientUtil;
|
|
|
|
|
import com.zdjizhi.utils.PublicUtil;
|
2024-02-29 19:03:07 +08:00
|
|
|
import org.apache.commons.lang.CharEncoding;
|
2024-01-22 17:33:39 +08:00
|
|
|
import org.apache.flink.configuration.Configuration;
|
|
|
|
|
import org.apache.flink.metrics.Counter;
|
2024-03-13 10:37:11 +08:00
|
|
|
import org.apache.flink.metrics.MeterView;
|
2024-01-22 17:33:39 +08:00
|
|
|
import org.apache.flink.metrics.MetricGroup;
|
|
|
|
|
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
2024-02-29 19:03:07 +08:00
|
|
|
import org.apache.http.HttpResponse;
|
|
|
|
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
|
|
|
|
import org.apache.http.client.methods.HttpPut;
|
|
|
|
|
import org.apache.http.concurrent.FutureCallback;
|
|
|
|
|
import org.apache.http.conn.ConnectTimeoutException;
|
|
|
|
|
import org.apache.http.conn.HttpHostConnectException;
|
|
|
|
|
import org.apache.http.entity.ByteArrayEntity;
|
|
|
|
|
import org.apache.http.impl.client.CloseableHttpClient;
|
|
|
|
|
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
|
|
|
|
import org.apache.http.util.EntityUtils;
|
2024-01-22 17:33:39 +08:00
|
|
|
|
|
|
|
|
import java.io.IOException;
|
2024-02-29 19:03:07 +08:00
|
|
|
import java.net.ConnectException;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
|
|
|
|
|
import static com.zdjizhi.utils.HttpHeaderConstants.*;
|
|
|
|
|
import static com.zdjizhi.utils.PublicConstants.*;
|
2024-01-22 17:33:39 +08:00
|
|
|
|
|
|
|
|
public class HosSink extends RichSinkFunction<FileChunk> {
|
2024-02-29 19:03:07 +08:00
|
|
|
private static final Log LOG = LogFactory.get();
|
2024-01-22 17:33:39 +08:00
|
|
|
|
|
|
|
|
private final Configuration configuration;
|
2024-03-19 15:11:02 +08:00
|
|
|
public transient Counter sinkRequestsCounter;
|
|
|
|
|
public transient Counter sinkErrorRequestsCounter;
|
|
|
|
|
public transient Counter sinkFilesCounter;
|
|
|
|
|
public transient Counter sinkChunksCounter;
|
|
|
|
|
public transient Counter lessThan5KBChunksCounter;
|
|
|
|
|
public transient Counter between5KBAnd10KBChunksCounter;
|
|
|
|
|
public transient Counter between10KBAnd50KBChunksCounter;
|
|
|
|
|
public transient Counter between50KBAnd100KBChunksCounter;
|
|
|
|
|
public transient Counter between100KBAnd1MBChunksCounter;
|
|
|
|
|
public transient Counter greaterThan1MBChunksCounter;
|
2024-02-29 19:03:07 +08:00
|
|
|
private boolean isAsync;
|
|
|
|
|
private CloseableHttpClient syncHttpClient;
|
|
|
|
|
private CloseableHttpAsyncClient asyncHttpClient;
|
|
|
|
|
private int loadBalanceMode;
|
|
|
|
|
private volatile String endpoint;
|
|
|
|
|
private List<String> ipList;
|
|
|
|
|
private List<String> portList;
|
|
|
|
|
private String token;
|
|
|
|
|
private volatile String bathPutUrl;
|
|
|
|
|
private HashMap<String, String> hosMessage;
|
|
|
|
|
private String objectsMeta = "";
|
|
|
|
|
private String objectsOffset = "";
|
|
|
|
|
private List<byte[]> byteList;
|
|
|
|
|
private long maxBatchSize;
|
|
|
|
|
private long maxBatchCount;
|
|
|
|
|
private long chunkSize = 0;
|
|
|
|
|
private int chunkCount = 0;
|
2024-01-22 17:33:39 +08:00
|
|
|
|
|
|
|
|
public HosSink(Configuration configuration) {
|
|
|
|
|
this.configuration = configuration;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void open(Configuration parameters) throws Exception {
|
|
|
|
|
super.open(parameters);
|
|
|
|
|
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
2024-03-19 15:11:02 +08:00
|
|
|
lessThan5KBChunksCounter = metricGroup.counter("lessThan5KBChunksCount");
|
|
|
|
|
between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
|
|
|
|
|
between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
|
|
|
|
|
between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
|
|
|
|
|
between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
|
|
|
|
|
greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
|
|
|
|
|
metricGroup.meter("numLessThan5KBChunksOutPerSecond", new MeterView(lessThan5KBChunksCounter));
|
|
|
|
|
metricGroup.meter("numBetween5KBAnd10KBChunksOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
|
|
|
|
|
metricGroup.meter("numBetween10KBAnd50KBChunksOutPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
|
|
|
|
|
metricGroup.meter("numBetween50KBAnd100KBChunkPsOuterSecond", new MeterView(between50KBAnd100KBChunksCounter));
|
|
|
|
|
metricGroup.meter("numBetween100KBAnd1MBChunksOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
|
|
|
|
|
metricGroup.meter("numGreaterThan1MBChunksOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
|
|
|
|
|
sinkRequestsCounter = metricGroup.counter("sinkRequestsCount");
|
|
|
|
|
sinkErrorRequestsCounter = metricGroup.counter("sinkErrorRequestsCount");
|
|
|
|
|
sinkFilesCounter = metricGroup.counter("sinkFilesCount");
|
|
|
|
|
sinkChunksCounter = metricGroup.counter("sinkChunksCount");
|
|
|
|
|
metricGroup.meter("numRequestsSinkPerSecond", new MeterView(sinkRequestsCounter, 5));
|
|
|
|
|
metricGroup.meter("numErrorRequestsSinkPerSecond", new MeterView(sinkErrorRequestsCounter));
|
|
|
|
|
metricGroup.meter("numFilesSinkPerSecond", new MeterView(sinkFilesCounter));
|
|
|
|
|
metricGroup.meter("numChunksSinkPerSecond", new MeterView(sinkChunksCounter));
|
|
|
|
|
|
2024-02-29 19:03:07 +08:00
|
|
|
loadBalanceMode = configuration.getInteger(Configs.SINK_HOS_LOAD_BALANCE_MODE);
|
|
|
|
|
if (loadBalanceMode == 0) {
|
|
|
|
|
endpoint = configuration.getString(Configs.SINK_HOS_ENDPOINT);
|
|
|
|
|
} else if (loadBalanceMode == 1) {
|
|
|
|
|
String[] ipPortArr = configuration.get(Configs.SINK_HOS_ENDPOINT).split(":");
|
|
|
|
|
ipList = Arrays.asList(ipPortArr[0].split(","));
|
|
|
|
|
portList = Arrays.asList(ipPortArr[1].split(","));
|
|
|
|
|
endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
|
|
|
|
|
}
|
|
|
|
|
token = configuration.get(Configs.SINK_HOS_TOKEN);
|
|
|
|
|
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
|
|
|
|
|
if (isAsync) {
|
|
|
|
|
asyncHttpClient = HttpClientUtil.getInstance(configuration).getAsyncHttpClient();
|
|
|
|
|
asyncHttpClient.start();
|
|
|
|
|
} else {
|
|
|
|
|
syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
|
|
|
|
|
}
|
|
|
|
|
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
|
|
|
|
|
maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
|
|
|
|
|
maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
|
|
|
|
|
hosMessage = new HashMap<>();
|
|
|
|
|
objectsMeta = "";
|
|
|
|
|
objectsOffset = "";
|
|
|
|
|
byteList = new ArrayList<>();
|
2024-01-22 17:33:39 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void invoke(FileChunk fileChunk, Context context) {
|
2024-02-29 19:03:07 +08:00
|
|
|
byte[] data = "".getBytes();
|
|
|
|
|
if (fileChunk.getChunk() != null) {
|
|
|
|
|
data = fileChunk.getChunk();
|
|
|
|
|
}
|
2024-03-19 15:11:02 +08:00
|
|
|
long chunkLength = data.length;
|
|
|
|
|
sinkChunksCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
if (configuration.get(Configs.SINK_BATCH)) {
|
|
|
|
|
hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
|
|
|
|
|
hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
|
|
|
|
|
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
|
|
|
|
|
hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
|
|
|
|
|
hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
|
|
|
|
|
if (fileChunk.getOffset() == 0) {
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkFilesCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
|
|
|
|
|
hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
|
|
|
|
|
}
|
|
|
|
|
hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
|
|
|
|
|
Map<String, Object> metaMap = fileChunk.getMeta();
|
|
|
|
|
if (metaMap != null && metaMap.size() > 0) {
|
|
|
|
|
for (String meta : metaMap.keySet()) {
|
|
|
|
|
hosMessage.put(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
objectsMeta += hosMessage.toString() + ";";
|
|
|
|
|
hosMessage.clear();
|
2024-03-19 15:11:02 +08:00
|
|
|
objectsOffset += chunkLength + ";";
|
2024-02-29 19:03:07 +08:00
|
|
|
byteList.add(data);
|
|
|
|
|
chunkCount++;
|
2024-03-19 15:11:02 +08:00
|
|
|
chunkSize += chunkLength;
|
|
|
|
|
calculateChunkSize(chunkLength);
|
2024-02-29 19:03:07 +08:00
|
|
|
if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
|
|
|
|
|
HttpPut httpPut = new HttpPut(bathPutUrl);
|
|
|
|
|
httpPut.setHeader(TOKEN, token);
|
|
|
|
|
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
|
|
|
|
|
httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
|
|
|
|
|
httpPut.setHeader(HOS_OBJECTS_META, objectsMeta);
|
|
|
|
|
httpPut.setHeader(HOS_OBJECTS_OFFSET, objectsOffset);
|
|
|
|
|
byte[][] bytes = new byte[byteList.size()][];
|
|
|
|
|
byteList.toArray(bytes);
|
|
|
|
|
byte[] newData = ArrayUtil.addAll(bytes);
|
|
|
|
|
httpPut.setEntity(new ByteArrayEntity(newData));
|
|
|
|
|
byteList.clear();
|
|
|
|
|
executeRequest(httpPut);
|
|
|
|
|
objectsMeta = "";
|
|
|
|
|
objectsOffset = "";
|
|
|
|
|
chunkSize = 0;
|
|
|
|
|
chunkCount = 0;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
|
|
|
|
|
HttpPut httpPut = new HttpPut(url);
|
|
|
|
|
httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
|
|
|
|
|
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
|
|
|
|
|
httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
|
|
|
|
|
String filename = fileChunk.getFileName();
|
|
|
|
|
if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
|
|
|
|
|
httpPut.setHeader(HOS_META_FILENAME, filename);
|
|
|
|
|
} else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
|
|
|
|
|
filename = filename + "." + fileChunk.getFileType();
|
|
|
|
|
httpPut.setHeader(HOS_META_FILENAME, filename);
|
|
|
|
|
} else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
|
|
|
|
|
httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
|
|
|
|
|
}
|
|
|
|
|
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
|
|
|
|
|
httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
|
|
|
|
|
httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
|
|
|
|
|
if (fileChunk.getOffset() == 0) {
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkFilesCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
|
|
|
|
|
httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
|
|
|
|
|
}
|
|
|
|
|
httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
|
|
|
|
|
Map<String, Object> metaMap = fileChunk.getMeta();
|
|
|
|
|
if (metaMap != null && metaMap.size() > 0) {
|
|
|
|
|
for (String meta : metaMap.keySet()) {
|
|
|
|
|
httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-03-04 15:43:18 +08:00
|
|
|
httpPut.setEntity(new ByteArrayEntity(data));
|
2024-03-19 15:11:02 +08:00
|
|
|
calculateChunkSize(chunkLength);
|
2024-02-29 19:03:07 +08:00
|
|
|
executeRequest(httpPut);
|
|
|
|
|
}
|
2024-01-22 17:33:39 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
2024-02-29 19:03:07 +08:00
|
|
|
public void close() {
|
|
|
|
|
IoUtil.close(syncHttpClient);
|
|
|
|
|
IoUtil.close(asyncHttpClient);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void executeRequest(HttpPut httpPut) {
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkRequestsCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
if (isAsync) {
|
|
|
|
|
asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void completed(HttpResponse httpResponse) {
|
|
|
|
|
try {
|
|
|
|
|
if (httpResponse.getStatusLine().getStatusCode() != 200) {
|
|
|
|
|
String responseEntity = EntityUtils.toString(httpResponse.getEntity(), CharEncoding.UTF_8);
|
|
|
|
|
LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkErrorRequestsCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
}
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
LOG.error("put part to hos error.", e);
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkErrorRequestsCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void failed(Exception ex) {
|
|
|
|
|
LOG.error("put part to hos error.", ex);
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkErrorRequestsCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
if (loadBalanceMode == 1 && ex instanceof ConnectException) {
|
|
|
|
|
endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
|
|
|
|
|
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void cancelled() {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
CloseableHttpResponse response = null;
|
|
|
|
|
try {
|
|
|
|
|
response = syncHttpClient.execute(httpPut);
|
|
|
|
|
if (response.getStatusLine().getStatusCode() != 200) {
|
|
|
|
|
String responseEntity = EntityUtils.toString(response.getEntity(), CharEncoding.UTF_8);
|
|
|
|
|
LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkErrorRequestsCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
}
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
LOG.error("put part to hos error.", e);
|
2024-03-19 15:11:02 +08:00
|
|
|
sinkErrorRequestsCounter.inc();
|
2024-02-29 19:03:07 +08:00
|
|
|
if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) {
|
|
|
|
|
endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
IoUtil.close(response);
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-01-22 17:33:39 +08:00
|
|
|
}
|
2024-03-19 15:11:02 +08:00
|
|
|
|
|
|
|
|
private void calculateChunkSize(long length) {
|
|
|
|
|
if (length <= 5 * 1024) {
|
|
|
|
|
lessThan5KBChunksCounter.inc();
|
|
|
|
|
} else if (length <= 10 * 1024) {
|
|
|
|
|
between5KBAnd10KBChunksCounter.inc();
|
|
|
|
|
} else if (length <= 50 * 1024) {
|
|
|
|
|
between10KBAnd50KBChunksCounter.inc();
|
|
|
|
|
} else if (length <= 100 * 1024) {
|
|
|
|
|
between50KBAnd100KBChunksCounter.inc();
|
|
|
|
|
} else if (length <= 1024 * 1024) {
|
|
|
|
|
between100KBAnd1MBChunksCounter.inc();
|
|
|
|
|
} else {
|
|
|
|
|
greaterThan1MBChunksCounter.inc();
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-01-22 17:33:39 +08:00
|
|
|
}
|