202010202006更新-现场联调后代码调优

This commit is contained in:
caohui
2020-10-20 20:06:51 +08:00
parent a157c81b5c
commit 93b3c7e750
8 changed files with 105 additions and 31 deletions

View File

@@ -1,8 +1,7 @@
package cn.ac.iie.cusflume.sink.HttpAsyncUtils;
import cn.ac.iie.cusflume.sink.daoUtils.RealtimeCountConfig;
import org.apache.http.Consts;
import org.apache.http.HttpHost;
import org.apache.http.*;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.MalformedChallengeException;
@@ -14,9 +13,9 @@ import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.impl.auth.*;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.BasicCredentialsProvider;
@@ -25,11 +24,14 @@ import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
@@ -101,7 +103,9 @@ public class HttpAsyncClient {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(connectionRequestTimeout)
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout).build();
.setSocketTimeout(socketTimeout)
.setStaleConnectionCheckEnabled(true)//逐出已被关闭的链接-20201017
.build();
SSLContext sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
X509TrustManager tm = new X509TrustManager() {
@@ -135,7 +139,10 @@ public class HttpAsyncClient {
.build();
// 配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setSoKeepAlive(false).setTcpNoDelay(true)
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
// .setSoKeepAlive(false)//20201017移除
.setSoKeepAlive(true)//20201017新修改
.setTcpNoDelay(true)
.setIoThreadCount(Runtime.getRuntime().availableProcessors())
.build();
// 设置连接池大小
@@ -169,18 +176,43 @@ public class HttpAsyncClient {
.build();
conMgr.setDefaultConnectionConfig(connectionConfig);
/**
* 定义一个strategy-keep-alive-20201017
*/
ConnectionKeepAliveStrategy kaStrategy = new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
HeaderElementIterator it = new BasicHeaderElementIterator
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase
("timeout")) {
return Long.parseLong(value) * 1000;
}
}
return 10 * 1000;//如果没有约定则默认定义时长为10s
}
};
if (proxy) {
return HttpAsyncClients.custom().setConnectionManager(conMgr)
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setProxy(new HttpHost(host, port))
.setDefaultCookieStore(new BasicCookieStore())
.setDefaultRequestConfig(requestConfig).build();
.setDefaultRequestConfig(requestConfig)
.setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017
.build();
} else {
return HttpAsyncClients.custom().setConnectionManager(conMgr)
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCookieStore(new BasicCookieStore()).build();
.setDefaultCookieStore(new BasicCookieStore())
.setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017
.build();
}
}

View File

@@ -778,7 +778,8 @@ public class HttpClientUtil {
* @param msgSessionCookie
* @return
*/
public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count) {
// public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count) {//20201018注释掉
public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018-新增发送时间
HttpPost httpPost = null;
urlProducer = urlProducer.trim();
byte[] resultArray = null;
@@ -868,11 +869,13 @@ public class HttpClientUtil {
// logger.debug("封装数据==>" + dataJson + "<==最终加载内容字节数组长度: " + resultArray.length);//20200428进一步细化日志
logger.debug("原始数据==>" + dataJson + "<==," +
"原始数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," +
"数据处理时间handleTime==>" + (System.currentTimeMillis() - postTime) + "<==," +
"最终加载内容字节数组长度: " + resultArray.length + "," +
"最终加载内容字节数组:" + Arrays.toString(resultArray));//20200521进一步细化日志
//执行请求
AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count);
// AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count);
AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);//20201018新增发送时间
httpClient.execute(httpPost, asyncPostMsgCallBack);
} catch (MalformedURLException e) {

View File

@@ -14,6 +14,7 @@ import org.apache.http.concurrent.FutureCallback;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@@ -29,14 +30,17 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
private String userAgent;
private String msgSessionCookie;
private int count;
private long postTime;//20201018新增发送时间
public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count) {
// public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count) {
public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018新增发送时间
this.postMsgUrl = postMsgUrl;
this.topicName = topicName;
this.dataJson = dataJson;
this.userAgent = userAgent;
this.msgSessionCookie = msgSessionCookie;
this.count = count;
this.postTime = postTime;
}
public String getPostMsgUrl() {
@@ -87,6 +91,14 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
this.count = count;
}
public long getPostTime() {
return postTime;
}
public void setPostTime(long postTime) {
this.postTime = postTime;
}
/**
* 请求完成后调用该函数
*/
@@ -96,14 +108,21 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
try {
int statuCode = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
String ret = EntityUtils.toString(entity);
String ret = null;
if (entity != null) {
// ret = EntityUtils.toString(entity);//旧-20201018移除
ret = EntityUtils.toString(entity, "UTF-8");//20201018新增修改添加UTF-8
}
logger.info("返回的生产原始响应体String数据为:" + ret);
/**
* 不直接进行对象转换,除非数据加载不成功
*/
Map map = JSONObject.parseObject(ret, Map.class);
int resRedirBodyCode = (int) map.get("code");
int resRedirBodyCode = 0;
if (map != null) {
resRedirBodyCode = (int) map.get("code");
}
// int resRedirBodyCode = resRedirBody.getCode();
logger.debug("生产数据==>" + dataJson + "<==," +
@@ -116,10 +135,11 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
logger.info("数据加载成功,返回码: " + statuCode);
logger.debug("生产数据==>" + dataJson + "<==," +
"生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," +
"服务端响应时间responseTime==>" + (System.currentTimeMillis() - postTime) + "<==," +
"数据加载成功,返回码: " + statuCode);
AvroMonitorTimerTask.msgSuccessSum++;
EntityUtils.consume(entity);
// EntityUtils.consume(entity);
} else {
switch (resRedirBodyCode) {
case 300:
@@ -155,15 +175,23 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
AvroMonitorTimerTask.msgFailedSum++;
break;
}
EntityUtils.consume(entity);
// EntityUtils.consume(entity);
}
if (entity != null) {
try {
EntityUtils.consume(entity);//20201018移到外部
} catch (final IOException ex) {
}
}
} catch (Exception e) {
logger.error("AsyncPostMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志
e.printStackTrace();
} finally {
HttpClientUtils.closeQuietly(response);//20201018移入finally
// EntityUtils.consumeQuietly(response.getEntity());
}
HttpClientUtils.closeQuietly(response);
}
/**
@@ -185,7 +213,8 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
AvroMonitorTimerTask.msgFailedSum++;
logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString());
} else {
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count);//failed失败时重试
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count);//failed失败时重试
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试//20201018新增发送时间
}
}

View File

@@ -346,7 +346,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
AvroMonitorTimerTask.msgReadyPostSum += contents.size();
for (String content : contents) {
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间
}
} else {//sessionCookie不为空
logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie);
@@ -356,7 +357,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
@Override
public void run() {
try {
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间
} catch (Exception e) {
logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
}
@@ -420,7 +422,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
redirectContents.clear();
AvroMonitorTimerTask.msgReadyPostSum += tmpListFreq.size();
for (String content : tmpListFreq) {
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//postRedirectDataEveryMin定时器-初始发送count计数为0
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//postRedirectDataEveryMin定时器-初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0//20201018新增发送时间
}
logger.info("PostRedirectDataEveryMin post to zx RedirectData size==>" + tmpListFreq.size() + "<==.");
}

View File

@@ -57,7 +57,8 @@ public class AvroMonitorTimerTask {
if ("yb".equals(RealtimeCountConfig.MONITOR_TYPE)) {//只有当类型为一部(yb)时才进行状态上报
String sendMsg = getJson(RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE, RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE_FLUME, topicType);//新版-20200428
logger.info("Send monitor message is===>>>" + sendMsg + "<<<===");
HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0);//静态方法无返回值用于多线程,初始发送count计数为0
// HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0);//静态方法无返回值用于多线程,初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0//20201018新增发送时间
}
} catch (Exception e) {
logger.error("Send monitorMsg to zx is error===>>>" + e + "<===");
@@ -74,11 +75,14 @@ public class AvroMonitorTimerTask {
//发送消息统计情况-warn类型便于脚本收集信息打印
logger.warn("last min " + lastMinTime + " monitorMsg count==>msgSuccessSum:{ " + msgSuccessSum + " },==>msgFailedSum:{ " + msgFailedSum + " },==>msgReadyPostSum:{ " + msgReadyPostSum + " },==>msgTotalSum:{ " + msgTotalSum + " }.");
// }
//重置为0
msgSuccessSum = 0;
msgFailedSum = 0;
msgTotalSum = 0;
msgReadyPostSum = 0;
if (RealtimeCountConfig.MONITOR_CLEAN_TYPE == 1) {//1表示定时重置
//重置为0
msgSuccessSum = 0;
msgFailedSum = 0;
msgTotalSum = 0;
msgReadyPostSum = 0;
}
}
}, 60000, 60000);
}

View File

@@ -34,6 +34,7 @@ public class RealtimeCountConfig implements Serializable {
* 监控器类型:一部(yb):状态上报;广东(gd):日志打印,然后靠脚本收集到influxdb上传(外部脚本完成)
*/
public static final String MONITOR_TYPE = RealtimeCountConfigurations.getStringProperty(0, "monitor.type");
public static final Integer MONITOR_CLEAN_TYPE = RealtimeCountConfigurations.getIntProperty(0, "monitor.clean.type");
/**
* 状态上报所需参数-仅一部-系统组件编码-目前24832-20191222

View File

@@ -2,19 +2,19 @@
http.async.parallelism=10
#异步Http客户端-等待数据超时时间,根据业务调整
http.async.socketTimeout=60000
http.async.socketTimeout=10000
#异步Http客户端-连接超时时间
http.async.connectTimeout=60000
http.async.connectTimeout=10000
#异步Http客户端-连接池最大连接数
http.async.poolSize=5000
http.async.poolSize=80
#异步Http客户端-每个主机的并发最多只有1500
http.async.maxPerRoute=2500
http.async.maxPerRoute=80
#异步Http客户端-从连接池中后去连接的timeout时间
http.async.connectionRequestTimeout=90000
http.async.connectionRequestTimeout=15000
#Schema配置信息

View File

@@ -93,3 +93,5 @@ schema.id.ntc-collect-telnet-log=6
schema.id.monitor-msg=22
schema.id.influx-sapp-bps-stat-log=22
#监控生产数是否每分钟重置1-重置为0(每分钟值)2-不重置(启动后累计值)
monitor.clean.type=1