diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java index 8d4579d..b2bab91 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java @@ -1,8 +1,7 @@ package cn.ac.iie.cusflume.sink.HttpAsyncUtils; import cn.ac.iie.cusflume.sink.daoUtils.RealtimeCountConfig; -import org.apache.http.Consts; -import org.apache.http.HttpHost; +import org.apache.http.*; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; import org.apache.http.auth.MalformedChallengeException; @@ -14,9 +13,9 @@ import org.apache.http.config.ConnectionConfig; import org.apache.http.config.Lookup; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.conn.ssl.SSLContexts; import org.apache.http.impl.auth.*; import org.apache.http.impl.client.BasicCookieStore; import org.apache.http.impl.client.BasicCredentialsProvider; @@ -25,11 +24,14 @@ import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.message.BasicHeaderElementIterator; import org.apache.http.nio.conn.NoopIOSessionStrategy; import org.apache.http.nio.conn.SchemeIOSessionStrategy; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOReactorException; +import org.apache.http.protocol.HTTP; +import org.apache.http.protocol.HttpContext; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -101,7 +103,9 @@ public class HttpAsyncClient { RequestConfig requestConfig = RequestConfig.custom() .setConnectionRequestTimeout(connectionRequestTimeout) .setConnectTimeout(connectTimeout) - .setSocketTimeout(socketTimeout).build(); + .setSocketTimeout(socketTimeout) + .setStaleConnectionCheckEnabled(true)//逐出已被关闭的链接-20201017 + .build(); SSLContext sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); X509TrustManager tm = new X509TrustManager() { @@ -135,7 +139,10 @@ public class HttpAsyncClient { .build(); // 配置io线程 - IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setSoKeepAlive(false).setTcpNoDelay(true) + IOReactorConfig ioReactorConfig = IOReactorConfig.custom() +// .setSoKeepAlive(false)//20201017移除 + .setSoKeepAlive(true)//20201017新修改 + .setTcpNoDelay(true) .setIoThreadCount(Runtime.getRuntime().availableProcessors()) .build(); // 设置连接池大小 @@ -169,18 +176,43 @@ public class HttpAsyncClient { .build(); conMgr.setDefaultConnectionConfig(connectionConfig); + /** + * 定义一个strategy-keep-alive-20201017 + */ + ConnectionKeepAliveStrategy kaStrategy = new ConnectionKeepAliveStrategy() { + @Override + public long getKeepAliveDuration(HttpResponse response, HttpContext context) { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && param.equalsIgnoreCase + ("timeout")) { + return Long.parseLong(value) * 1000; + } + } + return 10 * 1000;//如果没有约定,则默认定义时长为10s + } + }; + if (proxy) { return HttpAsyncClients.custom().setConnectionManager(conMgr) .setDefaultCredentialsProvider(credentialsProvider) .setDefaultAuthSchemeRegistry(authSchemeRegistry) .setProxy(new HttpHost(host, port)) .setDefaultCookieStore(new BasicCookieStore()) - .setDefaultRequestConfig(requestConfig).build(); + .setDefaultRequestConfig(requestConfig) + .setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017 + .build(); } else { return HttpAsyncClients.custom().setConnectionManager(conMgr) .setDefaultCredentialsProvider(credentialsProvider) .setDefaultAuthSchemeRegistry(authSchemeRegistry) - .setDefaultCookieStore(new BasicCookieStore()).build(); + .setDefaultCookieStore(new BasicCookieStore()) + .setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017 + .build(); } } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java index 094db0b..b85fe42 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java @@ -778,7 +778,8 @@ public class HttpClientUtil { * @param msgSessionCookie * @return */ - public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count) { +// public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count) {//20201018注释掉 + public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018-新增发送时间 HttpPost httpPost = null; urlProducer = urlProducer.trim(); byte[] resultArray = null; @@ -868,11 +869,13 @@ public class HttpClientUtil { // logger.debug("封装数据==>" + dataJson + "<==最终加载内容字节数组长度: " + resultArray.length);//20200428进一步细化日志 logger.debug("原始数据==>" + dataJson + "<==," + "原始数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," + + "数据处理时间handleTime==>" + (System.currentTimeMillis() - postTime) + "<==," + "最终加载内容字节数组长度: " + resultArray.length + "," + "最终加载内容字节数组:" + Arrays.toString(resultArray));//20200521进一步细化日志 //执行请求 - AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count); +// AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count); + AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);//20201018新增发送时间 httpClient.execute(httpPost, asyncPostMsgCallBack); } catch (MalformedURLException e) { diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java index da23dfd..cb85c6d 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java @@ -14,6 +14,7 @@ import org.apache.http.concurrent.FutureCallback; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; +import java.io.IOException; import java.util.Arrays; import java.util.Map; @@ -29,14 +30,17 @@ public class AsyncPostMsgCallBack implements FutureCallback { private String userAgent; private String msgSessionCookie; private int count; + private long postTime;//20201018新增发送时间 - public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count) { + // public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count) { + public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018新增发送时间 this.postMsgUrl = postMsgUrl; this.topicName = topicName; this.dataJson = dataJson; this.userAgent = userAgent; this.msgSessionCookie = msgSessionCookie; this.count = count; + this.postTime = postTime; } public String getPostMsgUrl() { @@ -87,6 +91,14 @@ public class AsyncPostMsgCallBack implements FutureCallback { this.count = count; } + public long getPostTime() { + return postTime; + } + + public void setPostTime(long postTime) { + this.postTime = postTime; + } + /** * 请求完成后调用该函数 */ @@ -96,14 +108,21 @@ public class AsyncPostMsgCallBack implements FutureCallback { try { int statuCode = response.getStatusLine().getStatusCode(); HttpEntity entity = response.getEntity(); - String ret = EntityUtils.toString(entity); + String ret = null; + if (entity != null) { +// ret = EntityUtils.toString(entity);//旧-20201018移除 + ret = EntityUtils.toString(entity, "UTF-8");//20201018新增修改,添加UTF-8 + } logger.info("返回的生产原始响应体String数据为:" + ret); /** * 不直接进行对象转换,除非数据加载不成功 */ Map map = JSONObject.parseObject(ret, Map.class); - int resRedirBodyCode = (int) map.get("code"); + int resRedirBodyCode = 0; + if (map != null) { + resRedirBodyCode = (int) map.get("code"); + } // int resRedirBodyCode = resRedirBody.getCode(); logger.debug("生产数据==>" + dataJson + "<==," + @@ -116,10 +135,11 @@ public class AsyncPostMsgCallBack implements FutureCallback { logger.info("数据加载成功,返回码: " + statuCode); logger.debug("生产数据==>" + dataJson + "<==," + "生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," + + "服务端响应时间responseTime==>" + (System.currentTimeMillis() - postTime) + "<==," + "数据加载成功,返回码: " + statuCode); AvroMonitorTimerTask.msgSuccessSum++; - EntityUtils.consume(entity); +// EntityUtils.consume(entity); } else { switch (resRedirBodyCode) { case 300: @@ -155,15 +175,23 @@ public class AsyncPostMsgCallBack implements FutureCallback { AvroMonitorTimerTask.msgFailedSum++; break; } - EntityUtils.consume(entity); +// EntityUtils.consume(entity); + } + + if (entity != null) { + try { + EntityUtils.consume(entity);//20201018移到外部 + } catch (final IOException ex) { + } } } catch (Exception e) { logger.error("AsyncPostMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志 e.printStackTrace(); + } finally { + HttpClientUtils.closeQuietly(response);//20201018移入finally +// EntityUtils.consumeQuietly(response.getEntity()); } - - HttpClientUtils.closeQuietly(response); } /** @@ -185,7 +213,8 @@ public class AsyncPostMsgCallBack implements FutureCallback { AvroMonitorTimerTask.msgFailedSum++; logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString()); } else { - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count);//failed失败时重试 +// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count);//failed失败时重试 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试//20201018新增发送时间 } } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java index 5ba13ff..739f7e0 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java @@ -346,7 +346,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { AvroMonitorTimerTask.msgReadyPostSum += contents.size(); for (String content : contents) { - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 +// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间 } } else {//sessionCookie不为空 logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); @@ -356,7 +357,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { @Override public void run() { try { - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 +// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间 } catch (Exception e) { logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); } @@ -420,7 +422,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { redirectContents.clear(); AvroMonitorTimerTask.msgReadyPostSum += tmpListFreq.size(); for (String content : tmpListFreq) { - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//postRedirectDataEveryMin定时器-初始发送count计数为0 +// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//postRedirectDataEveryMin定时器-初始发送count计数为0 + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0//20201018新增发送时间 } logger.info("PostRedirectDataEveryMin post to zx RedirectData size==>" + tmpListFreq.size() + "<==."); } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java index 505407c..fd34e71 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java @@ -57,7 +57,8 @@ public class AvroMonitorTimerTask { if ("yb".equals(RealtimeCountConfig.MONITOR_TYPE)) {//只有当类型为一部(yb)时才进行状态上报 String sendMsg = getJson(RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE, RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE_FLUME, topicType);//新版-20200428 logger.info("Send monitor message is===>>>" + sendMsg + "<<<==="); - HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0);//静态方法无返回值用于多线程,初始发送count计数为0 +// HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0);//静态方法无返回值用于多线程,初始发送count计数为0 + HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0//20201018新增发送时间 } } catch (Exception e) { logger.error("Send monitorMsg to zx is error===>>>" + e + "<==="); @@ -74,11 +75,14 @@ public class AvroMonitorTimerTask { //发送消息统计情况-warn类型便于脚本收集信息打印 logger.warn("last min " + lastMinTime + " monitorMsg count==>msgSuccessSum:{ " + msgSuccessSum + " },==>msgFailedSum:{ " + msgFailedSum + " },==>msgReadyPostSum:{ " + msgReadyPostSum + " },==>msgTotalSum:{ " + msgTotalSum + " }."); // } - //重置为0 - msgSuccessSum = 0; - msgFailedSum = 0; - msgTotalSum = 0; - msgReadyPostSum = 0; + + if (RealtimeCountConfig.MONITOR_CLEAN_TYPE == 1) {//1表示定时重置 + //重置为0 + msgSuccessSum = 0; + msgFailedSum = 0; + msgTotalSum = 0; + msgReadyPostSum = 0; + } } }, 60000, 60000); } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/daoUtils/RealtimeCountConfig.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/daoUtils/RealtimeCountConfig.java index 050c86d..257a7b7 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/daoUtils/RealtimeCountConfig.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/daoUtils/RealtimeCountConfig.java @@ -34,6 +34,7 @@ public class RealtimeCountConfig implements Serializable { * 监控器类型:一部(yb):状态上报;广东(gd):日志打印,然后靠脚本收集到influxdb上传(外部脚本完成) */ public static final String MONITOR_TYPE = RealtimeCountConfigurations.getStringProperty(0, "monitor.type"); + public static final Integer MONITOR_CLEAN_TYPE = RealtimeCountConfigurations.getIntProperty(0, "monitor.clean.type"); /** * 状态上报所需参数-仅一部-系统组件编码-目前24832-20191222 diff --git a/yb_http_avro_sink_file/src/main/resources/flume_config.properties b/yb_http_avro_sink_file/src/main/resources/flume_config.properties index f5eacf4..a1bc883 100644 --- a/yb_http_avro_sink_file/src/main/resources/flume_config.properties +++ b/yb_http_avro_sink_file/src/main/resources/flume_config.properties @@ -2,19 +2,19 @@ http.async.parallelism=10 #异步Http客户端-等待数据超时时间,根据业务调整 -http.async.socketTimeout=60000 +http.async.socketTimeout=10000 #异步Http客户端-连接超时时间 -http.async.connectTimeout=60000 +http.async.connectTimeout=10000 #异步Http客户端-连接池最大连接数 -http.async.poolSize=5000 +http.async.poolSize=80 #异步Http客户端-每个主机的并发最多只有1500 -http.async.maxPerRoute=2500 +http.async.maxPerRoute=80 #异步Http客户端-从连接池中后去连接的timeout时间 -http.async.connectionRequestTimeout=90000 +http.async.connectionRequestTimeout=15000 #Schema配置信息 diff --git a/yb_http_avro_sink_file/src/main/resources/realtime_service_config.properties b/yb_http_avro_sink_file/src/main/resources/realtime_service_config.properties index b4d124a..54a1902 100644 --- a/yb_http_avro_sink_file/src/main/resources/realtime_service_config.properties +++ b/yb_http_avro_sink_file/src/main/resources/realtime_service_config.properties @@ -93,3 +93,5 @@ schema.id.ntc-collect-telnet-log=6 schema.id.monitor-msg=22 schema.id.influx-sapp-bps-stat-log=22 +#监控生产数是否每分钟重置:1-重置为0(每分钟值);2-不重置(启动后累计值) +monitor.clean.type=1 \ No newline at end of file