From 8bf59b79611d4a699c98df6b9765eee23f566996 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Sat, 24 Oct 2020 00:22:30 +0800 Subject: [PATCH] =?UTF-8?q?fix(batch):=20=E8=AE=A1=E6=95=B0=20=E4=B8=8E?= =?UTF-8?q?=E4=B8=9A=E5=8A=A1=E9=80=BB=E8=BE=91=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sink/HttpAsyncUtils/HttpAsyncClient.java | 1 + .../msgCallBack/AsyncPostMsgCallBack.java | 10 +- .../iie/cusflume/sink/YbHttpAvroSinkFile.java | 93 ++++++++----------- .../sink/avroUtils/AvroMonitorTimerTask.java | 20 ++++ 4 files changed, 68 insertions(+), 56 deletions(-) diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java index 3d90ec7..d47c4c2 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java @@ -203,6 +203,7 @@ public class HttpAsyncClient { .setDefaultCredentialsProvider(credentialsProvider) .setDefaultAuthSchemeRegistry(authSchemeRegistry) .setDefaultCookieStore(new BasicCookieStore()) + .setDefaultRequestConfig(requestConfig) .setKeepAliveStrategy(kaStrategy) .build(); } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java index 7119684..0f6e87e 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java @@ -136,8 +136,10 @@ public class AsyncPostMsgCallBack implements FutureCallback { "服务端响应时间responseTime==>" + (System.currentTimeMillis() - postTime) + "<==," + "数据加载成功,返回码: " + statuCode); - AvroMonitorTimerTask.msgSuccessSum+=count; + AvroMonitorTimerTask.addSuccessNum(count); + } else { + AvroMonitorTimerTask.addFailedNum(count); switch (resRedirBodyCode) { case 300: logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); @@ -167,8 +169,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { YbHttpAvroSinkFile.updateCookie(); break; default: - logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode + "---失败数据(第一条):\n" + dataJson); - AvroMonitorTimerTask.msgFailedSum+= count; + logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode); break; } } @@ -204,7 +205,8 @@ public class AsyncPostMsgCallBack implements FutureCallback { count++; logger.info("AsyncPostMagCallBack Request is Failed,This Failed data is ==>" + dataJson + "<==,Retry count=" + count); if (count > 1) { - AvroMonitorTimerTask.msgFailedSum++; + + AvroMonitorTimerTask.addFailedNum(1); logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString()); } else { HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试 diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java index 1d6189e..2002b8e 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java @@ -318,10 +318,13 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { private void sendMsgLog(Transaction transaction, List contents) { try { - //获取状态回传sessionID - if (StringUtils.isBlank(monitorSessionCookie)) { + /** + * 获取状态回传sessionID,检查认证是否存在 + */ + if (StringUtils.isBlank(monitorSessionCookie) + || StringUtils.isBlank(msgSessionCookie)) { getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie - + getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证 if (!checkTimerStart) { checkCookieEveryWeek();//sendMsgLog-第一次启动检测到monitorSessionCookie为空时启动任务但不进行验证,后续间隔一段时间后开始验证,每次申请monitorSessionCookie和msgSessionCookie两个Cookie checkTimerStart = true; @@ -334,12 +337,6 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { monitorStart = true; logger.warn("MonitorMsg Timer is started......"); } - } - - AvroMonitorTimerTask.msgTotalSum += contents.size();//消息使用,文件+消息不使用 - //检查认证是否存在 - if (StringUtils.isBlank(msgSessionCookie)) { - getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证 /** * 开启定时扫描重定向数据集合 @@ -349,62 +346,44 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { redirectContentsPostStart = true; logger.warn("RedirectContents Timer Post is started......"); } + } - AvroMonitorTimerTask.msgReadyPostSum += contents.size(); - if (isFile(topicName)) { - for (String content : contents) { - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, - userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0 - } - } else { - byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); - HttpClientUtil.asyncProducerAvroMessageToZX(postMsgUrl, topicName, msgResults, contents.get(0), - userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0 - } - } else {//sessionCookie不为空 - logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); - AvroMonitorTimerTask.msgReadyPostSum += contents.size(); + AvroMonitorTimerTask.addTotalNum(contents.size()); + logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); + AvroMonitorTimerTask.addReadyPostNum(contents.size()); + if (isSingle(topicName)) { + for (String content : contents) { - //Router 接口,指定message 与 Files - if (isFile(topicName)) { - for (String content : contents) { - - pool.execute(new Runnable() { - @Override - public void run() { - try { - - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, - userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0 - } catch (Exception e) { - logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); - } - } - }); - } - } else { - byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); pool.execute(new Runnable() { @Override public void run() { try { - HttpClientUtil.asyncProducerAvroMessageToZX(postMsgUrl, topicName, msgResults, contents.get(0), - userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0 + + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, + userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0 } catch (Exception e) { logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); } } }); - - } - - - - + } else { + byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); + pool.execute(new Runnable() { + @Override + public void run() { + try { + HttpClientUtil.asyncProducerAvroMessageToZX(postMsgUrl, topicName, msgResults, contents.get(0), + userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0 + } catch (Exception e) { + logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); + } + } + }); } + } catch (Exception e) { logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); transaction.commit(); @@ -415,15 +394,25 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } } - private static boolean isFile(String topicName) { - if (topicName.equals("NTC-COLLECT-FILE-LOG")) + private static boolean isSingle(String topicName) { + + if (topicName.equals("NTC-COLLECT-FILE-LOG") + || topicName.equals("NTC-COLLECT-HTTP-DOC-LOG") + || topicName.equals("NTC-COLLECT-HTTP-AV-LOG") + || topicName.equals("NTC-COLLECT-FTP-DOC-LOG") + || topicName.equals("NTC-COLLECT-MAIL-LOG") + || topicName.equals("NTC-COLLECT-TELNET-LOG") + || topicName.equals("INFLUX-SAPP-BPS-STAT-LOG")) //监控消息,几分钟1条 + return true; else { return false; } } + + /** * 总线入库前验证 * diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java index 99fca78..c77a83f 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java @@ -42,6 +42,26 @@ public class AvroMonitorTimerTask { public static boolean startFileMonitor = false;//默认false + + public static synchronized void addSuccessNum(int count) { + msgSuccessSum += count; + } + + public static synchronized void addFailedNum(int count) { + msgFailedSum += count; + } + public static synchronized void addTotalNum(int count) { + msgTotalSum += count; + } + + public static synchronized void addReadyPostNum(int count) { + msgReadyPostSum += count; + } + + + + + /** * 消息---Java自定义定时器 */