diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java index a27dfaf..02a1d17 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java @@ -143,10 +143,8 @@ public class HttpClientUtil { HttpClientContext localContext = HttpClientContext.create(); BasicCookieStore cookieStore = new BasicCookieStore(); - httpPut = new HttpPut(baseUrl); - // httpPost.setHeader("Transfer-Encoding","close"); // httpPost.setHeader("Content-Type", "binary/octet-stream"); httpPut.setHeader("Content-Type", "binary/octet-stream"); diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java index 63a6d72..8af28d2 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java @@ -90,6 +90,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { logger.error("Starting YbHttpAvroSinkFile is error==>checkMsgUrl and postMsgUrl can not be null!!!!"); } + new Thread(new RetrySendMessage()).start(); logger.warn("Starting YbHttpAvroSinkFile ... ..."); } @@ -327,8 +328,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { /** * 获取状态回传sessionID,检查认证是否存在 */ - if (StringUtils.isBlank(monitorSessionCookie) - || StringUtils.isBlank(msgSessionCookie)) { + if (StringUtils.isBlank(monitorSessionCookie) + || StringUtils.isBlank(msgSessionCookie)) { getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证 if (!checkTimerStart) { @@ -345,9 +346,13 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } - logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); + logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); } + if (StringUtils.isBlank(configInfo.getMsgSessionCookie())) { + throw new RuntimeException("Get Cookie Error : " + configInfo.getCheckMsgUrl()); + } + AvroMonitorTimerTask.addTotalNum(contents.size()); AvroMonitorTimerTask.addReadyPostNum(contents.size()); if (isSingle(topicName)) { diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java index 231a929..7980e91 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java @@ -104,6 +104,7 @@ public class HttpManager { } }; ctx.init(null, new TrustManager[]{tm}, null); + SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE); Registry socketFactoryRegistry = RegistryBuilder.create() .register("http", PlainConnectionSocketFactory.INSTANCE) @@ -123,6 +124,7 @@ public class HttpManager { } httpClient = getHttpClient(); + } //请求重试机制 @@ -169,6 +171,7 @@ public class HttpManager { RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(3000) .setSocketTimeout(3000) + .setCookieSpec(CookieSpecs.IGNORE_COOKIES) //.setCookieSpec(CookieSpecs.BEST_MATCH) .build(); // 声明重定向策略对象 @@ -648,6 +651,7 @@ public class HttpManager { httpPost.addHeader("User-Agent", userAgent); httpPost.addHeader("X-Tag", xTag);//根据最新文档,目前已经不需要此头-20191217 httpPost.addHeader("Content-Type", "application/json"); + httpPost.setHeader("Cookie", ""); // httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); StringEntity payload = new StringEntity(data, Charset.forName("utf-8")); //payload.setContentType("text/xml; charset=UTF-8");