diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java index c80e16e..9620be9 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java @@ -777,7 +777,7 @@ public class HttpClientUtil { byte[] dataLengthByteArray = null; try { Schema schemaDataAvro = getSchemaFromHashMap(topic); - logger.info(topic + ": 消息批量组装数量" + contents.size()); + for (String dataJson : contents) { byte[] resultArray = null; ByteArrayOutputStream outAvro = new ByteArrayOutputStream(); @@ -804,9 +804,8 @@ public class HttpClientUtil { results = byteMerger(results, dataLengthByteArray, resultArray); } - } - logger.info(topic + "-组装后占用Bytes:" + " , " + results.length); + logger.info(topic + "数据封装记录 "+ contents.size() + " 条,字节数为:" + " , " + results.length); } catch (Exception e) { logger.error("Encapsulation Message Data Error " + e); } @@ -831,7 +830,7 @@ public class HttpClientUtil { httpPost.addHeader("Cookie", msgSessionCookie); String md5Avro = MD5Utils.md5Encode(results); httpPost.addHeader("Checksum", md5Avro); - logger.info("批量发送,总体请求端的Checksum MD5 avro 加密为:" + md5Avro); + logger.info("批量发送body Checksum MD5 为:" + md5Avro); httpPost.addHeader("Content-Type", "binary/octet-stream"); httpPost.addHeader("X-Tag", getXTAG(dataJson, topic)); ByteArrayEntity payload = new ByteArrayEntity(results); diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java index 0f6e87e..af7670c 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java @@ -133,7 +133,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { logger.info("数据加载成功,返回码: " + statuCode); logger.debug("生产数据==>" + dataJson + "<==," + "生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," + - "服务端响应时间responseTime==>" + (System.currentTimeMillis() - postTime) + "<==," + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + "数据加载成功,返回码: " + statuCode); AvroMonitorTimerTask.addSuccessNum(count); @@ -142,7 +142,9 @@ public class AsyncPostMsgCallBack implements FutureCallback { AvroMonitorTimerTask.addFailedNum(count); switch (resRedirBodyCode) { case 300: - logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); + logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); //若不包含对应字段,则不进行对象转换,减少报错 if (ret.contains("redirect")) { ResRedirBody resRedirBody = JSONObject.parseObject(ret, ResRedirBody.class); @@ -157,19 +159,27 @@ public class AsyncPostMsgCallBack implements FutureCallback { YbHttpAvroSinkFile.redirectContents.add(dataJson); break; case 301: - logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); + logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); YbHttpAvroSinkFile.redirectContents.add(dataJson); break; case 410: - logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); + logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); YbHttpAvroSinkFile.updateCookie(); break; case 500: - logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:500,处理请求过程出现系统错误."); + logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + ",resRedirBodyCode:500,处理请求过程出现系统错误."); YbHttpAvroSinkFile.updateCookie(); break; default: - logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode); + logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode); break; } } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java index 2002b8e..808f8cd 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java @@ -369,7 +369,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { }); } } else { + long beginAvroTime = System.currentTimeMillis(); byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); + logger.info("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) ); pool.execute(new Runnable() { @Override public void run() { @@ -383,7 +385,6 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { }); } - } catch (Exception e) { logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); transaction.commit(); @@ -391,6 +392,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { if (transaction != null) { transaction.commit(); } + AvroMonitorTimerTask.subReadyPostNum(contents.size()); } } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java index c77a83f..d62c81a 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java @@ -59,6 +59,17 @@ public class AvroMonitorTimerTask { } + public static synchronized void subReadyPostNum(int count) { + if ( msgReadyPostSum >= count) { + msgReadyPostSum -= count; + } else { + msgReadyPostSum = 0; + } + + } + + +