fix(batch): 增加计数,序列化时间延迟调试参数
This commit is contained in:
@@ -777,7 +777,7 @@ public class HttpClientUtil {
|
||||
byte[] dataLengthByteArray = null;
|
||||
try {
|
||||
Schema schemaDataAvro = getSchemaFromHashMap(topic);
|
||||
logger.info(topic + ": 消息批量组装数量" + contents.size());
|
||||
|
||||
for (String dataJson : contents) {
|
||||
byte[] resultArray = null;
|
||||
ByteArrayOutputStream outAvro = new ByteArrayOutputStream();
|
||||
@@ -804,9 +804,8 @@ public class HttpClientUtil {
|
||||
results = byteMerger(results, dataLengthByteArray, resultArray);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
logger.info(topic + "-组装后占用Bytes:" + " , " + results.length);
|
||||
logger.info(topic + "数据封装记录 "+ contents.size() + " 条,字节数为:" + " , " + results.length);
|
||||
} catch (Exception e) {
|
||||
logger.error("Encapsulation Message Data Error " + e);
|
||||
}
|
||||
@@ -831,7 +830,7 @@ public class HttpClientUtil {
|
||||
httpPost.addHeader("Cookie", msgSessionCookie);
|
||||
String md5Avro = MD5Utils.md5Encode(results);
|
||||
httpPost.addHeader("Checksum", md5Avro);
|
||||
logger.info("批量发送,总体请求端的Checksum MD5 avro 加密为:" + md5Avro);
|
||||
logger.info("批量发送body Checksum MD5 为:" + md5Avro);
|
||||
httpPost.addHeader("Content-Type", "binary/octet-stream");
|
||||
httpPost.addHeader("X-Tag", getXTAG(dataJson, topic));
|
||||
ByteArrayEntity payload = new ByteArrayEntity(results);
|
||||
|
||||
@@ -133,7 +133,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
logger.info("数据加载成功,返回码: " + statuCode);
|
||||
logger.debug("生产数据==>" + dataJson + "<==," +
|
||||
"生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," +
|
||||
"服务端响应时间responseTime==>" + (System.currentTimeMillis() - postTime) + "<==," +
|
||||
"服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," +
|
||||
"数据加载成功,返回码: " + statuCode);
|
||||
|
||||
AvroMonitorTimerTask.addSuccessNum(count);
|
||||
@@ -142,7 +142,9 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
AvroMonitorTimerTask.addFailedNum(count);
|
||||
switch (resRedirBodyCode) {
|
||||
case 300:
|
||||
logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
|
||||
logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
|
||||
//若不包含对应字段,则不进行对象转换,减少报错
|
||||
if (ret.contains("redirect")) {
|
||||
ResRedirBody resRedirBody = JSONObject.parseObject(ret, ResRedirBody.class);
|
||||
@@ -157,19 +159,27 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
YbHttpAvroSinkFile.redirectContents.add(dataJson);
|
||||
break;
|
||||
case 301:
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
|
||||
YbHttpAvroSinkFile.redirectContents.add(dataJson);
|
||||
break;
|
||||
case 410:
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
break;
|
||||
case 500:
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + ",resRedirBodyCode:500,处理请求过程出现系统错误.");
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ ",resRedirBodyCode:500,处理请求过程出现系统错误.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
break;
|
||||
default:
|
||||
logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode);
|
||||
logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,7 +369,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
long beginAvroTime = System.currentTimeMillis();
|
||||
byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents);
|
||||
logger.info("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) );
|
||||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
@@ -383,7 +385,6 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
|
||||
transaction.commit();
|
||||
@@ -391,6 +392,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
if (transaction != null) {
|
||||
transaction.commit();
|
||||
}
|
||||
AvroMonitorTimerTask.subReadyPostNum(contents.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -59,6 +59,17 @@ public class AvroMonitorTimerTask {
|
||||
}
|
||||
|
||||
|
||||
public static synchronized void subReadyPostNum(int count) {
|
||||
if ( msgReadyPostSum >= count) {
|
||||
msgReadyPostSum -= count;
|
||||
} else {
|
||||
msgReadyPostSum = 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user