202010211232更新

This commit is contained in:
caohui
2020-10-21 17:06:05 +08:00
parent 8bc6169af8
commit c72c66f76f
5 changed files with 20 additions and 41 deletions

View File

@@ -49,13 +49,6 @@ import java.security.cert.X509Certificate;
*/ */
public class HttpAsyncClient { public class HttpAsyncClient {
/*
private static int socketTimeout = 60000;//设置等待数据超时时间60秒钟 根据业务调整
private static int connectTimeout = 60000;//连接超时
private static int poolSize = 5000;//连接池最大连接数
private static int maxPerRoute = 2500;//每个主机的并发最多只有1500
private static int connectionRequestTimeout = 90000; //从连接池中后去连接的timeout时间
*/
private static int socketTimeout = RealtimeCountConfig.HTTP_ASYNC_SOCKETTIMEOUT;//设置等待数据超时时间60秒钟 根据业务调整 private static int socketTimeout = RealtimeCountConfig.HTTP_ASYNC_SOCKETTIMEOUT;//设置等待数据超时时间60秒钟 根据业务调整
private static int connectTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTTIMEOUT;//连接超时 private static int connectTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTTIMEOUT;//连接超时
private static int poolSize = RealtimeCountConfig.HTTP_ASYNC_POOLSIZE;//连接池最大连接数 private static int poolSize = RealtimeCountConfig.HTTP_ASYNC_POOLSIZE;//连接池最大连接数
@@ -85,7 +78,7 @@ public class HttpAsyncClient {
} }
/** /**
* 新版createAsyncClient(boolean proxy)---20200425注释 * 新版createAsyncClient(boolean proxy)
* @param proxy * @param proxy
* @return * @return
* @throws KeyManagementException * @throws KeyManagementException
@@ -104,7 +97,7 @@ public class HttpAsyncClient {
.setConnectionRequestTimeout(connectionRequestTimeout) .setConnectionRequestTimeout(connectionRequestTimeout)
.setConnectTimeout(connectTimeout) .setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout) .setSocketTimeout(socketTimeout)
.setStaleConnectionCheckEnabled(true)//逐出已被关闭的链接-20201017 .setStaleConnectionCheckEnabled(true)
.build(); .build();
SSLContext sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); SSLContext sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
@@ -140,8 +133,7 @@ public class HttpAsyncClient {
// 配置io线程 // 配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom() IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
// .setSoKeepAlive(false)//20201017移除 .setSoKeepAlive(true)
.setSoKeepAlive(true)//20201017新修改
.setTcpNoDelay(true) .setTcpNoDelay(true)
.setIoThreadCount(Runtime.getRuntime().availableProcessors()) .setIoThreadCount(Runtime.getRuntime().availableProcessors())
.build(); .build();
@@ -177,7 +169,7 @@ public class HttpAsyncClient {
conMgr.setDefaultConnectionConfig(connectionConfig); conMgr.setDefaultConnectionConfig(connectionConfig);
/** /**
* 定义一个strategy-keep-alive-20201017 * 定义一个strategy
*/ */
ConnectionKeepAliveStrategy kaStrategy = new ConnectionKeepAliveStrategy() { ConnectionKeepAliveStrategy kaStrategy = new ConnectionKeepAliveStrategy() {
@Override @Override
@@ -204,14 +196,14 @@ public class HttpAsyncClient {
.setProxy(new HttpHost(host, port)) .setProxy(new HttpHost(host, port))
.setDefaultCookieStore(new BasicCookieStore()) .setDefaultCookieStore(new BasicCookieStore())
.setDefaultRequestConfig(requestConfig) .setDefaultRequestConfig(requestConfig)
.setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017 .setKeepAliveStrategy(kaStrategy)
.build(); .build();
} else { } else {
return HttpAsyncClients.custom().setConnectionManager(conMgr) return HttpAsyncClients.custom().setConnectionManager(conMgr)
.setDefaultCredentialsProvider(credentialsProvider) .setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry) .setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCookieStore(new BasicCookieStore()) .setDefaultCookieStore(new BasicCookieStore())
.setKeepAliveStrategy(kaStrategy)//增加keep-alive配置-20201017 .setKeepAliveStrategy(kaStrategy)
.build(); .build();
} }

View File

@@ -778,8 +778,7 @@ public class HttpClientUtil {
* @param msgSessionCookie * @param msgSessionCookie
* @return * @return
*/ */
// public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count) {//20201018注释掉 public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018-新增发送时间
HttpPost httpPost = null; HttpPost httpPost = null;
urlProducer = urlProducer.trim(); urlProducer = urlProducer.trim();
byte[] resultArray = null; byte[] resultArray = null;
@@ -828,8 +827,8 @@ public class HttpClientUtil {
// set header // set header
httpPost.addHeader("User-Agent", userAgent); httpPost.addHeader("User-Agent", userAgent);
// httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);//关闭长连接-20191225新增-使用字符代替 // httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);//关闭长连接
httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);//打开长连接-20201017修改-使用字符代替 httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);//打开长连接
//一部状态回传需要此Cookie打开 //一部状态回传需要此Cookie打开
// if ("monitor-msg".equals(topic)) { // if ("monitor-msg".equals(topic)) {
// httpPost.addHeader("Cookie", msgSessionCookie);//广东文件消息测试时加上Cookie会验证不通过,即那边显示为两个Cookie,不加Cookie则验证通过 // httpPost.addHeader("Cookie", msgSessionCookie);//广东文件消息测试时加上Cookie会验证不通过,即那边显示为两个Cookie,不加Cookie则验证通过
@@ -874,8 +873,7 @@ public class HttpClientUtil {
"最终加载内容字节数组:" + Arrays.toString(resultArray));//20200521进一步细化日志 "最终加载内容字节数组:" + Arrays.toString(resultArray));//20200521进一步细化日志
//执行请求 //执行请求
// AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count); AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);
AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);//20201018新增发送时间
httpClient.execute(httpPost, asyncPostMsgCallBack); httpClient.execute(httpPost, asyncPostMsgCallBack);
} catch (MalformedURLException e) { } catch (MalformedURLException e) {

View File

@@ -30,10 +30,9 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
private String userAgent; private String userAgent;
private String msgSessionCookie; private String msgSessionCookie;
private int count; private int count;
private long postTime;//20201018新增发送时间 private long postTime;
// public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count) { public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {//20201018新增发送时间
this.postMsgUrl = postMsgUrl; this.postMsgUrl = postMsgUrl;
this.topicName = topicName; this.topicName = topicName;
this.dataJson = dataJson; this.dataJson = dataJson;
@@ -110,8 +109,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
HttpEntity entity = response.getEntity(); HttpEntity entity = response.getEntity();
String ret = null; String ret = null;
if (entity != null) { if (entity != null) {
// ret = EntityUtils.toString(entity);//旧-20201018移除 ret = EntityUtils.toString(entity, "UTF-8");
ret = EntityUtils.toString(entity, "UTF-8");//20201018新增修改添加UTF-8
} }
logger.info("返回的生产原始响应体String数据为:" + ret); logger.info("返回的生产原始响应体String数据为:" + ret);
@@ -139,7 +137,6 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
"数据加载成功,返回码: " + statuCode); "数据加载成功,返回码: " + statuCode);
AvroMonitorTimerTask.msgSuccessSum++; AvroMonitorTimerTask.msgSuccessSum++;
// EntityUtils.consume(entity);
} else { } else {
switch (resRedirBodyCode) { switch (resRedirBodyCode) {
case 300: case 300:
@@ -163,7 +160,6 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
break; break;
case 410: case 410:
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
// YbHttpAvroSinkFile.redirectContents.add(dataJson);
YbHttpAvroSinkFile.updateCookie(); YbHttpAvroSinkFile.updateCookie();
break; break;
case 500: case 500:
@@ -175,12 +171,11 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
AvroMonitorTimerTask.msgFailedSum++; AvroMonitorTimerTask.msgFailedSum++;
break; break;
} }
// EntityUtils.consume(entity);
} }
if (entity != null) { if (entity != null) {
try { try {
EntityUtils.consume(entity);//20201018移到外部 EntityUtils.consume(entity);
} catch (final IOException ex) { } catch (final IOException ex) {
} }
} }
@@ -189,8 +184,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
logger.error("AsyncPostMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志 logger.error("AsyncPostMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
HttpClientUtils.closeQuietly(response);//20201018移入finally HttpClientUtils.closeQuietly(response);
// EntityUtils.consumeQuietly(response.getEntity());
} }
} }
@@ -213,8 +207,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
AvroMonitorTimerTask.msgFailedSum++; AvroMonitorTimerTask.msgFailedSum++;
logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString()); logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString());
} else { } else {
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count);//failed失败时重试 HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试//20201018新增发送时间
} }
} }

View File

@@ -346,8 +346,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
AvroMonitorTimerTask.msgReadyPostSum += contents.size(); AvroMonitorTimerTask.msgReadyPostSum += contents.size();
for (String content : contents) { for (String content : contents) {
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间
} }
} else {//sessionCookie不为空 } else {//sessionCookie不为空
logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie);
@@ -357,8 +356,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
@Override @Override
public void run() { public void run() {
try { try {
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//初始发送count计数为0 HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//初始发送count计数为0//20201018新增发送时间
} catch (Exception e) { } catch (Exception e) {
logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
} }
@@ -422,8 +420,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
redirectContents.clear(); redirectContents.clear();
AvroMonitorTimerTask.msgReadyPostSum += tmpListFreq.size(); AvroMonitorTimerTask.msgReadyPostSum += tmpListFreq.size();
for (String content : tmpListFreq) { for (String content : tmpListFreq) {
// HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0);//postRedirectDataEveryMin定时器-初始发送count计数为0 HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0//20201018新增发送时间
} }
logger.info("PostRedirectDataEveryMin post to zx RedirectData size==>" + tmpListFreq.size() + "<==."); logger.info("PostRedirectDataEveryMin post to zx RedirectData size==>" + tmpListFreq.size() + "<==.");
} }

View File

@@ -57,8 +57,7 @@ public class AvroMonitorTimerTask {
if ("yb".equals(RealtimeCountConfig.MONITOR_TYPE)) {//只有当类型为一部(yb)时才进行状态上报 if ("yb".equals(RealtimeCountConfig.MONITOR_TYPE)) {//只有当类型为一部(yb)时才进行状态上报
String sendMsg = getJson(RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE, RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE_FLUME, topicType);//新版-20200428 String sendMsg = getJson(RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE, RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE_FLUME, topicType);//新版-20200428
logger.info("Send monitor message is===>>>" + sendMsg + "<<<==="); logger.info("Send monitor message is===>>>" + sendMsg + "<<<===");
// HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0);//静态方法无返回值用于多线程,初始发送count计数为0 HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0
HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0//20201018新增发送时间
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Send monitorMsg to zx is error===>>>" + e + "<==="); logger.error("Send monitorMsg to zx is error===>>>" + e + "<===");