修复hos挂掉一台后,http sink仍往挂掉hos写数据的问题
This commit is contained in:
@@ -20,8 +20,6 @@ import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.concurrent.FutureCallback;
|
||||
import org.apache.http.conn.ConnectTimeoutException;
|
||||
import org.apache.http.conn.HttpHostConnectException;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||
@@ -29,7 +27,9 @@ import org.apache.http.nio.reactor.IOReactorException;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@@ -355,7 +355,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
if (ex instanceof IllegalStateException || ex instanceof IOReactorException) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
if (loadBalanceMode == 1 && ex instanceof ConnectException) {
|
||||
if (loadBalanceMode == 1 && (ex instanceof SocketException || ex instanceof InterruptedIOException || ex instanceof UnknownHostException)) {
|
||||
endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
|
||||
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
|
||||
}
|
||||
@@ -378,7 +378,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
||||
} catch (IOException e) {
|
||||
LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
|
||||
errorChunksCounter.inc();
|
||||
if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) {
|
||||
if (loadBalanceMode == 1 && (e instanceof SocketException || e instanceof InterruptedIOException || e instanceof UnknownHostException)) {
|
||||
endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
|
||||
}
|
||||
} finally {
|
||||
|
||||
Reference in New Issue
Block a user