diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java index b2bab91..3d90ec7 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java @@ -49,13 +49,6 @@ import java.security.cert.X509Certificate; */ public class HttpAsyncClient { - /* - private static int socketTimeout = 60000;//设置等待数据超时时间60秒钟 根据业务调整 - private static int connectTimeout = 60000;//连接超时 - private static int poolSize = 5000;//连接池最大连接数 - private static int maxPerRoute = 2500;//每个主机的并发最多只有1500 - private static int connectionRequestTimeout = 90000; //从连接池中后去连接的timeout时间 - */ private static int socketTimeout = RealtimeCountConfig.HTTP_ASYNC_SOCKETTIMEOUT;//设置等待数据超时时间60秒钟 根据业务调整 private static int connectTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTTIMEOUT;//连接超时 private static int poolSize = RealtimeCountConfig.HTTP_ASYNC_POOLSIZE;//连接池最大连接数 @@ -85,7 +78,7 @@ public class HttpAsyncClient { } /** - * 新版createAsyncClient(boolean proxy)---20200425注释 + * 新版createAsyncClient(boolean proxy) * @param proxy * @return * @throws KeyManagementException @@ -104,7 +97,7 @@ public class HttpAsyncClient { .setConnectionRequestTimeout(connectionRequestTimeout) .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) - .setStaleConnectionCheckEnabled(true)//逐出已被关闭的链接-20201017 + .setStaleConnectionCheckEnabled(true) .build(); SSLContext sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); @@ -140,8 +133,7 @@ public class HttpAsyncClient { // 配置io线程 IOReactorConfig ioReactorConfig = IOReactorConfig.custom() -// .setSoKeepAlive(false)//20201017移除 - .setSoKeepAlive(true)//20201017新修改 + .setSoKeepAlive(true) .setTcpNoDelay(true) .setIoThreadCount(Runtime.getRuntime().availableProcessors()) .build(); @@ -177,7 +169,7 @@ public class HttpAsyncClient { conMgr.setDefaultConnectionConfig(connectionConfig); /** - * 定义一个strategy-keep-alive-20201017 + * 定义一个strategy */ ConnectionKeepAliveStrategy kaStrategy = new ConnectionKeepAliveStrategy() { @Override @@ -204,14 +196,14 @@ public class HttpAsyncClient { .setProxy(new HttpHost(host, port)) .setDefaultCookieStore(new BasicCookieStore()) .setDefaultRequestConfig(requestConfig) - .setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017 + .setKeepAliveStrategy(kaStrategy) .build(); } else { return HttpAsyncClients.custom().setConnectionManager(conMgr) .setDefaultCredentialsProvider(credentialsProvider) .setDefaultAuthSchemeRegistry(authSchemeRegistry) .setDefaultCookieStore(new BasicCookieStore()) - .setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017 + .setKeepAliveStrategy(kaStrategy) .build(); } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java index b85fe42..be3461b 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java @@ -778,8 +778,7 @@ public class HttpClientUtil { * @param msgSessionCookie * @return */ -// public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count) {//20201018注释掉 - public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018-新增发送时间 + public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { HttpPost httpPost = null; urlProducer = urlProducer.trim(); byte[] resultArray = null; @@ -828,8 +827,8 @@ public class HttpClientUtil { // set header httpPost.addHeader("User-Agent", userAgent); -// httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);//关闭长连接-20191225新增-使用字符代替 - httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);//打开长连接-20201017修改-使用字符代替 +// httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);//关闭长连接 + httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);//打开长连接 //一部状态回传需要此Cookie打开 // if ("monitor-msg".equals(topic)) { // httpPost.addHeader("Cookie", msgSessionCookie);//广东文件消息测试时加上Cookie会验证不通过,即那边显示为两个Cookie,不加Cookie则验证通过 @@ -874,8 +873,7 @@ public class HttpClientUtil { "最终加载内容字节数组:" + Arrays.toString(resultArray));//20200521进一步细化日志 //执行请求 -// AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count); - AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);//20201018新增发送时间 + AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime); httpClient.execute(httpPost, asyncPostMsgCallBack); } catch (MalformedURLException e) { diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java index cb85c6d..02dc167 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java @@ -30,10 +30,9 @@ public class AsyncPostMsgCallBack implements FutureCallback { private String userAgent; private String msgSessionCookie; private int count; - private long postTime;//20201018新增发送时间 + private long postTime; - // public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count) { - public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018新增发送时间 + public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { this.postMsgUrl = postMsgUrl; this.topicName = topicName; this.dataJson = dataJson; @@ -110,8 +109,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { HttpEntity entity = response.getEntity(); String ret = null; if (entity != null) { -// ret = EntityUtils.toString(entity);//旧-20201018移除 - ret = EntityUtils.toString(entity, "UTF-8");//20201018新增修改,添加UTF-8 + ret = EntityUtils.toString(entity, "UTF-8"); } logger.info("返回的生产原始响应体String数据为:" + ret); @@ -139,7 +137,6 @@ public class AsyncPostMsgCallBack implements FutureCallback { "数据加载成功,返回码: " + statuCode); AvroMonitorTimerTask.msgSuccessSum++; -// EntityUtils.consume(entity); } else { switch (resRedirBodyCode) { case 300: @@ -163,7 +160,6 @@ public class AsyncPostMsgCallBack implements FutureCallback { break; case 410: logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); -// YbHttpAvroSinkFile.redirectContents.add(dataJson); YbHttpAvroSinkFile.updateCookie(); break; case 500: @@ -175,12 +171,11 @@ public class AsyncPostMsgCallBack implements FutureCallback { AvroMonitorTimerTask.msgFailedSum++; break; } -// EntityUtils.consume(entity); } if (entity != null) { try { - EntityUtils.consume(entity);//20201018移到外部 + EntityUtils.consume(entity); } catch (final IOException ex) { } } @@ -189,8 +184,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { logger.error("AsyncPostMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志 e.printStackTrace(); } finally { - HttpClientUtils.closeQuietly(response);//20201018移入finally -// EntityUtils.consumeQuietly(response.getEntity()); + HttpClientUtils.closeQuietly(response); } } @@ -213,8 +207,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { AvroMonitorTimerTask.msgFailedSum++; logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString()); } else { -// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count);//failed失败时重试 - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试//20201018新增发送时间 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试 } } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java index 739f7e0..1dc445c 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java @@ -346,8 +346,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { AvroMonitorTimerTask.msgReadyPostSum += contents.size(); for (String content : contents) { -// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0 } } else {//sessionCookie不为空 logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); @@ -357,8 +356,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { @Override public void run() { try { -// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0 } catch (Exception e) { logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); } @@ -422,8 +420,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { redirectContents.clear(); AvroMonitorTimerTask.msgReadyPostSum += tmpListFreq.size(); for (String content : tmpListFreq) { -// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//postRedirectDataEveryMin定时器-初始发送count计数为0 - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0//20201018新增发送时间 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0 } logger.info("PostRedirectDataEveryMin post to zx RedirectData size==>" + tmpListFreq.size() + "<==."); } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java index fd34e71..09ff00a 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java @@ -57,8 +57,7 @@ public class AvroMonitorTimerTask { if ("yb".equals(RealtimeCountConfig.MONITOR_TYPE)) {//只有当类型为一部(yb)时才进行状态上报 String sendMsg = getJson(RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE, RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE_FLUME, topicType);//新版-20200428 logger.info("Send monitor message is===>>>" + sendMsg + "<<<==="); -// HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0);//静态方法无返回值用于多线程,初始发送count计数为0 - HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0//20201018新增发送时间 + HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0 } } catch (Exception e) { logger.error("Send monitorMsg to zx is error===>>>" + e + "<===");