fix(batch): 计数 与业务逻辑优化

This commit is contained in:
doufenghu
2020-10-24 00:22:30 +08:00
parent 8a780334af
commit 8bf59b7961
4 changed files with 68 additions and 56 deletions

View File

@@ -203,6 +203,7 @@ public class HttpAsyncClient {
.setDefaultCredentialsProvider(credentialsProvider) .setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry) .setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCookieStore(new BasicCookieStore()) .setDefaultCookieStore(new BasicCookieStore())
.setDefaultRequestConfig(requestConfig)
.setKeepAliveStrategy(kaStrategy) .setKeepAliveStrategy(kaStrategy)
.build(); .build();
} }

View File

@@ -136,8 +136,10 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
"服务端响应时间responseTime==>" + (System.currentTimeMillis() - postTime) + "<==," + "服务端响应时间responseTime==>" + (System.currentTimeMillis() - postTime) + "<==," +
"数据加载成功,返回码: " + statuCode); "数据加载成功,返回码: " + statuCode);
AvroMonitorTimerTask.msgSuccessSum+=count; AvroMonitorTimerTask.addSuccessNum(count);
} else { } else {
AvroMonitorTimerTask.addFailedNum(count);
switch (resRedirBodyCode) { switch (resRedirBodyCode) {
case 300: case 300:
logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
@@ -167,8 +169,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
YbHttpAvroSinkFile.updateCookie(); YbHttpAvroSinkFile.updateCookie();
break; break;
default: default:
logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode + "---失败数据(第一条):\n" + dataJson); logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode);
AvroMonitorTimerTask.msgFailedSum+= count;
break; break;
} }
} }
@@ -204,7 +205,8 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
count++; count++;
logger.info("AsyncPostMagCallBack Request is Failed,This Failed data is ==>" + dataJson + "<==,Retry count=" + count); logger.info("AsyncPostMagCallBack Request is Failed,This Failed data is ==>" + dataJson + "<==,Retry count=" + count);
if (count > 1) { if (count > 1) {
AvroMonitorTimerTask.msgFailedSum++;
AvroMonitorTimerTask.addFailedNum(1);
logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString()); logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString());
} else { } else {
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试 HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试

View File

@@ -318,10 +318,13 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
private void sendMsgLog(Transaction transaction, List<String> contents) { private void sendMsgLog(Transaction transaction, List<String> contents) {
try { try {
//获取状态回传sessionID /**
if (StringUtils.isBlank(monitorSessionCookie)) { * 获取状态回传sessionID,检查认证是否存在
*/
if (StringUtils.isBlank(monitorSessionCookie)
|| StringUtils.isBlank(msgSessionCookie)) {
getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie
getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证
if (!checkTimerStart) { if (!checkTimerStart) {
checkCookieEveryWeek();//sendMsgLog-第一次启动检测到monitorSessionCookie为空时启动任务但不进行验证,后续间隔一段时间后开始验证,每次申请monitorSessionCookie和msgSessionCookie两个Cookie checkCookieEveryWeek();//sendMsgLog-第一次启动检测到monitorSessionCookie为空时启动任务但不进行验证,后续间隔一段时间后开始验证,每次申请monitorSessionCookie和msgSessionCookie两个Cookie
checkTimerStart = true; checkTimerStart = true;
@@ -334,12 +337,6 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
monitorStart = true; monitorStart = true;
logger.warn("MonitorMsg Timer is started......"); logger.warn("MonitorMsg Timer is started......");
} }
}
AvroMonitorTimerTask.msgTotalSum += contents.size();//消息使用,文件+消息不使用
//检查认证是否存在
if (StringUtils.isBlank(msgSessionCookie)) {
getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证
/** /**
* 开启定时扫描重定向数据集合 * 开启定时扫描重定向数据集合
@@ -349,25 +346,13 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
redirectContentsPostStart = true; redirectContentsPostStart = true;
logger.warn("RedirectContents Timer Post is started......"); logger.warn("RedirectContents Timer Post is started......");
} }
AvroMonitorTimerTask.msgReadyPostSum += contents.size();
if (isFile(topicName)) {
for (String content : contents) {
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content,
userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0
}
} else {
byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents);
HttpClientUtil.asyncProducerAvroMessageToZX(postMsgUrl, topicName, msgResults, contents.get(0),
userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0
} }
} else {//sessionCookie不为空
AvroMonitorTimerTask.addTotalNum(contents.size());
logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie);
AvroMonitorTimerTask.msgReadyPostSum += contents.size(); AvroMonitorTimerTask.addReadyPostNum(contents.size());
if (isSingle(topicName)) {
//Router 接口指定message 与 Files
if (isFile(topicName)) {
for (String content : contents) { for (String content : contents) {
pool.execute(new Runnable() { pool.execute(new Runnable() {
@@ -396,15 +381,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
} }
} }
}); });
} }
}
} catch (Exception e) { } catch (Exception e) {
logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
transaction.commit(); transaction.commit();
@@ -415,15 +394,25 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
} }
} }
private static boolean isFile(String topicName) {
if (topicName.equals("NTC-COLLECT-FILE-LOG")) private static boolean isSingle(String topicName) {
if (topicName.equals("NTC-COLLECT-FILE-LOG")
|| topicName.equals("NTC-COLLECT-HTTP-DOC-LOG")
|| topicName.equals("NTC-COLLECT-HTTP-AV-LOG")
|| topicName.equals("NTC-COLLECT-FTP-DOC-LOG")
|| topicName.equals("NTC-COLLECT-MAIL-LOG")
|| topicName.equals("NTC-COLLECT-TELNET-LOG")
|| topicName.equals("INFLUX-SAPP-BPS-STAT-LOG")) //监控消息几分钟1条
return true; return true;
else { else {
return false; return false;
} }
} }
/** /**
* 总线入库前验证 * 总线入库前验证
* *

View File

@@ -42,6 +42,26 @@ public class AvroMonitorTimerTask {
public static boolean startFileMonitor = false;//默认false public static boolean startFileMonitor = false;//默认false
public static synchronized void addSuccessNum(int count) {
msgSuccessSum += count;
}
public static synchronized void addFailedNum(int count) {
msgFailedSum += count;
}
public static synchronized void addTotalNum(int count) {
msgTotalSum += count;
}
public static synchronized void addReadyPostNum(int count) {
msgReadyPostSum += count;
}
/** /**
* 消息---Java自定义定时器 * 消息---Java自定义定时器
*/ */