fix(batch): 未整理目前已上线代码,需增加分支进行功能扩展
This commit is contained in:
@@ -0,0 +1,79 @@
|
||||
package cn.ac.iie.cusflume.sink.CommonUtils;
|
||||
|
||||
import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientFactory;
|
||||
import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil;
|
||||
import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncPostMsgCallBack;
|
||||
import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.ClientProtocolException;
|
||||
import org.apache.http.client.ResponseHandler;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.methods.HttpRequestBase;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||
import org.apache.http.protocol.HTTP;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.mortbay.log.Log;
|
||||
import scala.util.Try;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
|
||||
public class HttpClientTest {
|
||||
|
||||
private static CloseableHttpClient client = null;
|
||||
private static Logger LOG = Logger.getLogger(HttpClientTest.class);
|
||||
static {
|
||||
//连接池对象
|
||||
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
|
||||
//将最大连接数增加到200
|
||||
connectionManager.setMaxTotal(40);
|
||||
//将每个路由的默认最大连接数增加到20
|
||||
connectionManager.setDefaultMaxPerRoute(40);
|
||||
//HttpClient对象
|
||||
client = HttpClients.custom().setConnectionManager(connectionManager).build();
|
||||
}
|
||||
|
||||
|
||||
public static void producerAvroMessageToZX (String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
|
||||
try {
|
||||
HttpPost httpPost = new HttpPost(urlProducer);
|
||||
//2020-9-9 新加头部
|
||||
httpPost.setHeader("Connection", "Keep-Alive");
|
||||
httpPost.setHeader("Content-type", "application/avro+json;charset=UTF-8");
|
||||
httpPost.setHeader(HttpHeaders.USER_AGENT, userAgent);
|
||||
httpPost.addHeader("X-Tag", HttpClientUtil.getXTAG(dataJson, topic));
|
||||
httpPost.setHeader("Cookie", msgSessionCookie);
|
||||
String md5Avro = MD5Utils.md5Encode(results);
|
||||
httpPost.addHeader("Checksum", md5Avro);
|
||||
httpPost.setEntity(new ByteArrayEntity(results));
|
||||
|
||||
ResponseHandler<String> responseHandler = new ResponseHandler<String>() {
|
||||
@Override
|
||||
public String handleResponse(HttpResponse response) throws ClientProtocolException, IOException {
|
||||
return EntityUtils.toString(response.getEntity());
|
||||
}
|
||||
};
|
||||
RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(10000)
|
||||
.setConnectTimeout(10000)
|
||||
.setSocketTimeout(10000)
|
||||
.build();
|
||||
httpPost.setConfig(requestConfig);
|
||||
long startTime = System.currentTimeMillis();
|
||||
String body = client.execute(httpPost, responseHandler); // 线程可能会在这里被阻塞
|
||||
Log.warn(Thread.currentThread().getName() + "用时" + (System.currentTimeMillis() - startTime));
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,232 @@
|
||||
package cn.ac.iie.cusflume.sink.CommonUtils;
|
||||
|
||||
import cn.ac.iie.cusflume.sink.YbHttpAvroSinkFile;
|
||||
import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask;
|
||||
import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody;
|
||||
import cn.ac.iie.cusflume.sink.daoUtils.RealtimeCountConfig;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.http.*;
|
||||
import org.apache.http.client.ClientProtocolException;
|
||||
import org.apache.http.client.HttpRequestRetryHandler;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.protocol.HttpClientContext;
|
||||
import org.apache.http.conn.ConnectTimeoutException;
|
||||
import org.apache.http.conn.ConnectionKeepAliveStrategy;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
||||
import org.apache.http.message.BasicHeaderElementIterator;
|
||||
import org.apache.http.protocol.HTTP;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
public class SinkHttpClientUtil {
|
||||
private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class);
|
||||
/** 全局连接池对象 */
|
||||
private static final PoolingHttpClientConnectionManager connManager =
|
||||
new PoolingHttpClientConnectionManager();
|
||||
public static final String DEFAULT_CHARSET = "utf-8";
|
||||
private static int socketTimeout = RealtimeCountConfig.HTTP_ASYNC_SOCKETTIMEOUT;//设置等待数据超时时间60秒钟 根据业务调整
|
||||
private static int connectTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTTIMEOUT;//连接超时
|
||||
private static int poolSize = RealtimeCountConfig.HTTP_ASYNC_POOLSIZE;//连接池最大连接数
|
||||
private static int maxPerRoute = RealtimeCountConfig.HTTP_ASYNC_MAXPERROUTE;//每个主机的并发最多只有1500
|
||||
private static int connectionRequestTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTIONREQUESTTIMEOUT; //从连接池中后去连接的timeout时间
|
||||
|
||||
|
||||
static {
|
||||
connManager.setMaxTotal(poolSize);
|
||||
connManager.setDefaultMaxPerRoute(maxPerRoute);
|
||||
}
|
||||
|
||||
public static CloseableHttpClient getHttpClient() {
|
||||
RequestConfig requestConfig = RequestConfig.custom()
|
||||
// 获取连接超时时间
|
||||
.setConnectionRequestTimeout(connectionRequestTimeout)
|
||||
// 请求超时时间
|
||||
.setConnectTimeout(connectTimeout)
|
||||
// 响应超时时间
|
||||
.setSocketTimeout(socketTimeout)
|
||||
.build();
|
||||
|
||||
|
||||
/**
|
||||
* 测出超时重试机制为了防止超时不生效而设置
|
||||
* 如果直接放回false,不重试
|
||||
* 这里会根据情况进行判断是否重试
|
||||
*/
|
||||
HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
|
||||
if (executionCount >= 3) {// 如果已经重试了3次,就放弃
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
|
||||
return true;
|
||||
}
|
||||
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof InterruptedIOException) {// 超时
|
||||
return true;
|
||||
}
|
||||
if (exception instanceof UnknownHostException) {// 目标服务器不可达
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof SSLException) {// ssl握手异常
|
||||
return false;
|
||||
}
|
||||
HttpClientContext clientContext = HttpClientContext.adapt(context);
|
||||
HttpRequest request = clientContext.getRequest();
|
||||
// 如果请求是幂等的,就再次尝试
|
||||
if (!(request instanceof HttpEntityEnclosingRequest)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
|
||||
ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
|
||||
HeaderElementIterator it = new BasicHeaderElementIterator
|
||||
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
|
||||
while (it.hasNext()) {
|
||||
HeaderElement he = it.nextElement();
|
||||
String param = he.getName();
|
||||
String value = he.getValue();
|
||||
if (value != null && param.equalsIgnoreCase("timeout")) {
|
||||
return Long.parseLong(value) * 1000;
|
||||
}
|
||||
}
|
||||
return 60 * 1000;//如果没有约定,则默认定义时长为60s
|
||||
};
|
||||
|
||||
// 创建httpClient
|
||||
return HttpClients.custom()
|
||||
// 把请求相关的超时信息设置到连接客户端
|
||||
.setDefaultRequestConfig(requestConfig)
|
||||
// 把请求重试设置到连接客户端
|
||||
.setRetryHandler(retry)
|
||||
.setKeepAliveStrategy(myStrategy)
|
||||
// 配置连接池管理对象
|
||||
.setConnectionManager(connManager)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static void httpPost(String url, byte[] requestBody, int batchSize, Header... headers) {
|
||||
String msg = "-1";
|
||||
// 获取客户端连接对象
|
||||
CloseableHttpClient httpClient = getHttpClient();
|
||||
// 创建POST请求对象
|
||||
CloseableHttpResponse response = null;
|
||||
try {
|
||||
HttpPost httpPost = new HttpPost(url);
|
||||
httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
|
||||
httpPost.setHeader("Content-Type", "application/json");
|
||||
if (StringUtil.isNotEmpty(headers)) {
|
||||
for (Header h : headers) {
|
||||
httpPost.addHeader(h);
|
||||
}
|
||||
}
|
||||
ByteArrayEntity payload = new ByteArrayEntity(requestBody);
|
||||
payload.setContentEncoding("utf-8");
|
||||
httpPost.setEntity(payload);
|
||||
long startTime = System.currentTimeMillis();
|
||||
response = httpClient.execute(httpPost);
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
// 获取响应实体
|
||||
HttpEntity entity = response.getEntity();
|
||||
// 获取响应信息
|
||||
msg = EntityUtils.toString(entity, "UTF-8");
|
||||
Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime));
|
||||
if (statusCode != HttpStatus.SC_OK ) {
|
||||
AvroMonitorTimerTask.addSuccessNum(batchSize);
|
||||
LOG.info("数据总线响应内容:" + msg);
|
||||
} else {
|
||||
AvroMonitorTimerTask.addFailedNum(batchSize);
|
||||
LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg);
|
||||
|
||||
switch (statusCode) {
|
||||
case 300:
|
||||
LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>"
|
||||
+ ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
|
||||
//若不包含对应字段,则不进行对象转换,减少报错
|
||||
if (msg.contains("redirect")) {
|
||||
ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class);
|
||||
String redirectUrl = resRedirBody.getData().getRedirect();
|
||||
if (StringUtils.isNotBlank(redirectUrl)) {
|
||||
YbHttpAvroSinkFile.changeUrl(redirectUrl);
|
||||
}
|
||||
} else {
|
||||
LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!");
|
||||
}
|
||||
|
||||
|
||||
break;
|
||||
case 301:
|
||||
LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode +
|
||||
",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
|
||||
break;
|
||||
case 410:
|
||||
LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
|
||||
+ "服务端响应时间(ms)==>" + "<==,"
|
||||
+ ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
break;
|
||||
case 500:
|
||||
LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
|
||||
+ "服务端响应时间(ms)==>" + "<==,"
|
||||
+ ",resRedirBodyCode:500,处理请求过程出现系统错误.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
break;
|
||||
default:
|
||||
LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==,"
|
||||
+ "---Status Code:" + statusCode );
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
} catch (ClientProtocolException e) {
|
||||
LOG.error("协议错误: {}", e);
|
||||
} catch (ParseException e) {
|
||||
LOG.error("解析错误: {}", e);
|
||||
} catch (IOException e) {
|
||||
LOG.error("IO错误: {}", e);
|
||||
} finally {
|
||||
if (null != response) {
|
||||
try {
|
||||
EntityUtils.consumeQuietly(response.getEntity());
|
||||
response.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("释放链接错误: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -167,8 +167,9 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
if (statusCode == 200) {
|
||||
LOG.info("AsyncHttpClientPostFileCallback completed,post file statuscode is:{" + statusCode + "},now start to send message to zx.");
|
||||
//post文件成功后可以获取文件id
|
||||
String postResBody = getHttpContent(response);
|
||||
/* //post文件成功后可以获取文件id
|
||||
|
||||
System.out.println("Post File to zx resBody====>" + postResBody);
|
||||
if (StringUtils.isNotBlank(postResBody)) {
|
||||
this.postFileResBody = JSONObject.parseObject(postResBody, PostFileResBody.class);
|
||||
@@ -185,7 +186,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
|
||||
LOG.info("Send message with fileId to zx over,this responseBody is===>" + proResBody.toString());
|
||||
} else {
|
||||
LOG.warn("AsyncHttpClientPostFileCallback post file success but postResBody(response body) is null.");
|
||||
}
|
||||
}*/
|
||||
} else if (statusCode == 403) {//空文件,不再重试发送
|
||||
LOG.error("AsyncHttpClientPostFileCallback post zxFile statusCode is 403 so get the fileIs but this minio file is empty.This message is===>>>" + sendMsg + "<<<===");
|
||||
} else {
|
||||
@@ -196,7 +197,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
HttpClientUtils.closeQuietly(response);
|
||||
HttpClientUtils.closeQuietly(response);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -227,7 +228,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
|
||||
if (count > 1) {
|
||||
LOG.warn("AsyncHttpClientPostFileCallback post zxFile is failed,retry count=" + count);
|
||||
}
|
||||
if (count > 4) {
|
||||
/* if (count > 4) {
|
||||
LOG.error("AsyncHttpClientPostFileCallback post zxFile is failed and already retry 3 times.error===>>>" + e + "<<<===.This message is===>>>" + sendMsg + "<<<===");
|
||||
} else {
|
||||
if (configInfo != null && StringUtils.isNotBlank(sendMsg) && resultIs != null) {
|
||||
@@ -236,7 +237,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
|
||||
//重试时也是以之前存储的流的形式发往总线
|
||||
HttpClientUtil.httpAsyncPostFileToZx(configInfo.getPostFileUrl(), resultIs, asyncHttpClientPostFileCallback);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
} catch (Exception e2) {
|
||||
LOG.error("AsyncHttpClientGetFileCallback retry is error===>>>" + e2);
|
||||
}
|
||||
@@ -253,7 +254,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
|
||||
if (count > 1) {
|
||||
LOG.warn("AsyncHttpClientPostFileCallback post zxFile statusCode is abnormal,retry count=" + count);
|
||||
}
|
||||
if (count > 4) {
|
||||
/* if (count > 4) {
|
||||
LOG.error("AsyncHttpClientPostFileCallback post zxFile statusCode is abnormal and already retry 3 times.statusCode is{" + statusCode + "}.This message is===>>>" + sendMsg + "<<<===");
|
||||
} else {
|
||||
if (configInfo != null && StringUtils.isNotBlank(sendMsg) && resultIs != null) {
|
||||
@@ -262,7 +263,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
|
||||
//重试时也是以之前存储的流的形式发往总线
|
||||
HttpClientUtil.httpAsyncPostFileToZx(configInfo.getPostFileUrl(), resultIs, asyncHttpClientPostFileCallback);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
} catch (Exception e2) {
|
||||
LOG.error("AsyncHttpClientGetFileCallback retry is error===>>>" + e2);
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ public class HttpAsyncClient {
|
||||
.register("https", new SSLIOSessionStrategy(sslcontext, NoopHostnameVerifier.INSTANCE))
|
||||
.build();
|
||||
|
||||
// 配置io线程
|
||||
// 配置io线程\
|
||||
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
|
||||
.setSoKeepAlive(true)
|
||||
.setTcpNoDelay(true)
|
||||
@@ -185,7 +185,7 @@ public class HttpAsyncClient {
|
||||
return Long.parseLong(value) * 1000;
|
||||
}
|
||||
}
|
||||
return 10 * 1000;//如果没有约定,则默认定义时长为10s
|
||||
return 60 * 1000;//如果没有约定,则默认定义时长为10s
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package cn.ac.iie.cusflume.sink.HttpAsyncUtils;
|
||||
|
||||
import cn.ac.iie.cusflume.sink.CommonUtils.GetDataDictionaryCodeByTopicUtils;
|
||||
import cn.ac.iie.cusflume.sink.CommonUtils.GetFilePathByTopicUtils;
|
||||
import cn.ac.iie.cusflume.sink.CommonUtils.HttpClientTest;
|
||||
import cn.ac.iie.cusflume.sink.HttpAsyncUtils.mail.AsyncPostMailFilesCallback;
|
||||
import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncPostMsgCallBack;
|
||||
import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask;
|
||||
@@ -78,18 +79,18 @@ public class HttpClientUtil {
|
||||
HttpPost httpPost = new HttpPost(baseUrl);
|
||||
|
||||
//Parameters
|
||||
LOG.warn("==== Parameters ======" + list);
|
||||
LOG.info("==== Parameters ======" + list);
|
||||
CloseableHttpResponse response = null;
|
||||
try {
|
||||
httpPost.setEntity(new UrlEncodedFormEntity(list));
|
||||
// httpPost.setHeader("Connection","close");
|
||||
response = httpClient.execute(httpPost);
|
||||
LOG.warn("========HttpResponseProxy:========" + response.getStatusLine());
|
||||
LOG.info("========HttpResponseProxy:========" + response.getStatusLine());
|
||||
HttpEntity entity = response.getEntity();
|
||||
String result = null;
|
||||
if (entity != null) {
|
||||
result = EntityUtils.toString(entity, "UTF-8");
|
||||
LOG.warn("========Response=======" + result);
|
||||
LOG.info("========Response=======" + result);
|
||||
}
|
||||
EntityUtils.consume(entity);
|
||||
return result;
|
||||
@@ -814,10 +815,56 @@ public class HttpClientUtil {
|
||||
|
||||
}
|
||||
|
||||
public static void asyncProducerAvroMessageToZX(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) throws IOException {
|
||||
HttpClientTest.producerAvroMessageToZX(urlProducer, topic, results, dataJson, userAgent, msgSessionCookie, count, postTime);
|
||||
}
|
||||
|
||||
|
||||
public static void asyncProducerAvroMessageToZX_toBatch(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
|
||||
HttpPost httpPost = null;
|
||||
urlProducer = urlProducer.trim();
|
||||
CloseableHttpAsyncClient httpClient = null;
|
||||
try {
|
||||
httpClient = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient();
|
||||
httpClient.start();
|
||||
httpPost = new HttpPost(urlProducer);
|
||||
httpPost.addHeader("User-Agent", userAgent);
|
||||
httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
|
||||
httpPost.addHeader("Cookie", msgSessionCookie);
|
||||
String md5Avro = MD5Utils.md5Encode(results);
|
||||
httpPost.addHeader("Checksum", md5Avro);
|
||||
logger.info("批量发送body Checksum MD5 为:" + md5Avro);
|
||||
httpPost.addHeader("Content-Type", "binary/octet-stream");
|
||||
httpPost.addHeader("X-Tag", "getXTAG(dataJson, topic)");
|
||||
ByteArrayEntity payload = new ByteArrayEntity(results);
|
||||
payload.setContentEncoding("utf-8");
|
||||
httpPost.setEntity(payload);
|
||||
AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);
|
||||
httpClient.execute(httpPost, asyncPostMsgCallBack);
|
||||
logger.info("当前Thread number ID :" + Thread.currentThread().getId());
|
||||
} catch (MalformedURLException e) {
|
||||
//执行URL url = new URL()的异常
|
||||
e.printStackTrace();
|
||||
} catch (ClientProtocolException e) {
|
||||
// 执行httpClient.execute(httpGet)的异常
|
||||
e.printStackTrace();
|
||||
} catch (IOException e) {
|
||||
// 执行httpClient.execute(httpGet)的异常
|
||||
logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
//handle response here... try other servers
|
||||
logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 批量生产消息-总线
|
||||
*/
|
||||
public static void asyncProducerAvroMessageToZX(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
|
||||
public static void asyncProducerAvroMessageToZX_bk(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
|
||||
HttpPost httpPost = null;
|
||||
urlProducer = urlProducer.trim();
|
||||
CloseableHttpAsyncClient httpClient = null;
|
||||
@@ -838,6 +885,7 @@ public class HttpClientUtil {
|
||||
httpPost.setEntity(payload);
|
||||
AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);
|
||||
httpClient.execute(httpPost, asyncPostMsgCallBack);
|
||||
logger.info("当前Thread number ID :" + Thread.currentThread().getId());
|
||||
} catch (MalformedURLException e) {
|
||||
//执行URL url = new URL()的异常
|
||||
e.printStackTrace();
|
||||
@@ -1169,7 +1217,7 @@ public class HttpClientUtil {
|
||||
* @param dataJson
|
||||
* @return
|
||||
*/
|
||||
private static String getXTAG(String dataJson, String topic) {
|
||||
public static String getXTAG(String dataJson, String topic) {
|
||||
if ("monitor-msg".equals(topic) || "INFLUX-SAPP-BPS-STAT-LOG".equals(topic)) {
|
||||
return RealtimeCountConfig.MONITOR_NOFILE_MSG_X_TAG;
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
public void completed(HttpResponse response) {
|
||||
// ProResBody proResBody = null;
|
||||
try {
|
||||
int statuCode = response.getStatusLine().getStatusCode();
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
HttpEntity entity = response.getEntity();
|
||||
String ret = null;
|
||||
if (entity != null) {
|
||||
@@ -129,12 +129,12 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
/**
|
||||
* 20200818-接口细化响应码
|
||||
*/
|
||||
if (statuCode == 200 && resRedirBodyCode == 200) {
|
||||
logger.info("数据加载成功,返回码: " + statuCode);
|
||||
if (statusCode == 200 && resRedirBodyCode == 200) {
|
||||
logger.info("数据加载成功,返回码: " + statusCode);
|
||||
logger.debug("生产数据==>" + dataJson + "<==," +
|
||||
"生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," +
|
||||
"服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," +
|
||||
"数据加载成功,返回码: " + statuCode);
|
||||
"数据加载成功,返回码: " + statusCode);
|
||||
|
||||
AvroMonitorTimerTask.addSuccessNum(count);
|
||||
|
||||
@@ -144,7 +144,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
case 300:
|
||||
logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
|
||||
+ "<==,Status Code:" + statusCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
|
||||
//若不包含对应字段,则不进行对象转换,减少报错
|
||||
if (ret.contains("redirect")) {
|
||||
ResRedirBody resRedirBody = JSONObject.parseObject(ret, ResRedirBody.class);
|
||||
@@ -159,19 +159,19 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
YbHttpAvroSinkFile.redirectContents.add(dataJson);
|
||||
break;
|
||||
case 301:
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode
|
||||
logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
|
||||
YbHttpAvroSinkFile.redirectContents.add(dataJson);
|
||||
break;
|
||||
case 410:
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode
|
||||
logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
break;
|
||||
case 500:
|
||||
logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode
|
||||
logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ ",resRedirBodyCode:500,处理请求过程出现系统错误.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
@@ -179,7 +179,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
|
||||
default:
|
||||
logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret
|
||||
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
|
||||
+ "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode);
|
||||
+ "---Status Code:" + statusCode + "---resRedirBodyCode:" + resRedirBodyCode);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
package cn.ac.iie.cusflume.sink;
|
||||
|
||||
import cn.ac.iie.cusflume.sink.CommonUtils.SinkHttpClientUtil;
|
||||
import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask;
|
||||
import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils;
|
||||
import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.ParseException;
|
||||
import org.apache.http.client.ClientProtocolException;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.protocol.HTTP;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SinkService {
|
||||
private static final SinkService sinkService = new SinkService();
|
||||
|
||||
private SinkService() {
|
||||
}
|
||||
//SinkHttpClientUtil.getHttpClient();
|
||||
public static CloseableHttpClient httpClient = null;
|
||||
private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class);
|
||||
|
||||
public static SinkService getInstance() {
|
||||
return sinkService;
|
||||
}
|
||||
|
||||
public void producerAvroMessageToBus(
|
||||
String urlProducer, String topic, byte[] results,
|
||||
String dataJson, String userAgent, String msgSessionCookie, int batchSize) {
|
||||
|
||||
httpClient = SinkHttpClientUtil.getHttpClient();
|
||||
HttpPost httpPost = null;
|
||||
urlProducer = urlProducer.trim();
|
||||
String msg = "-1";
|
||||
// 创建POST请求对象
|
||||
CloseableHttpResponse response = null;
|
||||
try {
|
||||
httpPost = new HttpPost(urlProducer);
|
||||
httpPost.addHeader("User-Agent", userAgent);
|
||||
httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
|
||||
httpPost.addHeader("Cookie", msgSessionCookie);
|
||||
String md5Avro = MD5Utils.md5Encode(results);
|
||||
httpPost.addHeader("Checksum", md5Avro);
|
||||
LOG.info("批量发送body Checksum MD5 为:" + md5Avro);
|
||||
httpPost.addHeader("Content-Type", "binary/octet-stream");
|
||||
httpPost.addHeader("X-Tag", cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil.getXTAG(dataJson, topic));
|
||||
ByteArrayEntity payload = new ByteArrayEntity(results);
|
||||
payload.setContentEncoding("utf-8");
|
||||
httpPost.setEntity(payload);
|
||||
long startTime = System.currentTimeMillis();
|
||||
response = httpClient.execute(httpPost);
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
// 获取响应实体
|
||||
HttpEntity entity = response.getEntity();
|
||||
// 获取响应信息
|
||||
msg = EntityUtils.toString(entity, "UTF-8");
|
||||
Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime));
|
||||
if (statusCode != HttpStatus.SC_OK) {
|
||||
AvroMonitorTimerTask.addSuccessNum(batchSize);
|
||||
LOG.info("数据总线响应内容:" + msg);
|
||||
} else {
|
||||
AvroMonitorTimerTask.addFailedNum(batchSize);
|
||||
LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg);
|
||||
|
||||
switch (statusCode) {
|
||||
case 300:
|
||||
LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>"
|
||||
+ ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
|
||||
//若不包含对应字段,则不进行对象转换,减少报错
|
||||
if (msg.contains("redirect")) {
|
||||
ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class);
|
||||
String redirectUrl = resRedirBody.getData().getRedirect();
|
||||
if (StringUtils.isNotBlank(redirectUrl)) {
|
||||
YbHttpAvroSinkFile.changeUrl(redirectUrl);
|
||||
}
|
||||
} else {
|
||||
LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!");
|
||||
}
|
||||
|
||||
|
||||
break;
|
||||
case 301:
|
||||
LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode +
|
||||
",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
|
||||
break;
|
||||
case 410:
|
||||
LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
|
||||
+ "服务端响应时间(ms)==>" + "<==,"
|
||||
+ ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
break;
|
||||
case 500:
|
||||
LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
|
||||
+ "服务端响应时间(ms)==>" + "<==,"
|
||||
+ ",resRedirBodyCode:500,处理请求过程出现系统错误.");
|
||||
YbHttpAvroSinkFile.updateCookie();
|
||||
break;
|
||||
default:
|
||||
LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==,"
|
||||
+ "---Status Code:" + statusCode);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
} catch (ClientProtocolException e) {
|
||||
LOG.error("协议错误: {}", e);
|
||||
} catch (ParseException e) {
|
||||
LOG.error("解析错误: {}", e);
|
||||
} catch (IOException e) {
|
||||
LOG.error("IO错误: {}", e);
|
||||
} catch (Exception e) {
|
||||
LOG.error("其它错误: {}", e);
|
||||
} finally {
|
||||
if (null != response) {
|
||||
try {
|
||||
EntityUtils.consumeQuietly(response.getEntity());
|
||||
response.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("释放链接错误: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import org.apache.flume.sink.AbstractSink;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -25,7 +26,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
private static Logger logger = Logger.getLogger(YbHttpAvroSinkFile.class);
|
||||
|
||||
protected static ExecutorService pool = Executors.newFixedThreadPool(RealtimeCountConfig.HTTP_ASYNC_PARALLELISM);
|
||||
|
||||
private static ConcurrentLinkedQueue<byte[]> concurrentLinkedQueue = new ConcurrentLinkedQueue<byte[]>();
|
||||
private static DataCenterLoad dcl;
|
||||
|
||||
private static String postMsgUrl;//发送消息路径,配置文件获取,发送文件与发送消息皆需要
|
||||
@@ -42,6 +43,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
private static String userAgent;//业务系统编码systemId,该字段为系统的唯一编码,配置文件获取
|
||||
|
||||
private static String xTag;//标签编码tag,在总线中唯一标识该标签,配置文件获取--20191217笔记--貌似现在已经不需要这个参数作为头部了
|
||||
private static String dataJson;
|
||||
|
||||
private static String msgSessionCookie;//消息会话标识,由响应返回
|
||||
private static String fileSessionCookie;//文件会话标识,由响应返回,仅发送文件时需要---若只发送消息,则fileSessionCookie会一直为空-即仅广东需要
|
||||
@@ -94,6 +96,10 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
logger.error("Starting YbHttpAvroSinkFile is error==>checkMsgUrl and postMsgUrl can not be null!!!!");
|
||||
}
|
||||
|
||||
logger.warn("启动Sink File 执行程序 ==============");
|
||||
//new Thread(new Consumer()).start();
|
||||
logger.warn("开启多线程消费队列数据==================");
|
||||
|
||||
logger.warn("Starting YbHttpAvroSinkFile ... ...");
|
||||
}
|
||||
|
||||
@@ -172,10 +178,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
public Status process() throws EventDeliveryException {
|
||||
Status result = Status.READY;
|
||||
Channel channel = getChannel();
|
||||
Transaction transaction = null;
|
||||
// AcResBody acCheckResBody = null;
|
||||
// ProResBody producerResBody = null;
|
||||
Transaction transaction = null;
|
||||
try {
|
||||
logger.debug("Current Process Thread number ID :" + Thread.currentThread().getId());
|
||||
transaction = channel.getTransaction();
|
||||
transaction.begin();
|
||||
Event event = null;
|
||||
@@ -214,6 +219,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
*/
|
||||
case "INFLUX-SAPP-BPS-STAT-LOG"://读取回写的influxDB合计数据用作状态上传
|
||||
sendMsgLog(transaction, contents);//20191209移除文件发送,仅处理消息
|
||||
//sendMsgController(transaction, contents);
|
||||
break;
|
||||
default:
|
||||
logger.error("YbHttpAvroSinkFile can't find this topic:" + topicName + ".Please confirm this topicName is correct!!!");
|
||||
@@ -280,8 +286,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
}
|
||||
if (statusCode == 200) {
|
||||
if (StringUtils.isNotBlank(acCheckMsgMonitorResBody.getSessionId())) {
|
||||
logger.warn("getMonitorSessionCookie-Thread.currentThread().getName()===>" + Thread.currentThread().getName());
|
||||
logger.warn("AC msgMonitor successfully,msgMonitor sessionId is ===>" + acCheckMsgMonitorResBody.getSessionId());
|
||||
logger.info("getMonitorSessionCookie-Thread.currentThread().getName()===>" + Thread.currentThread().getName());
|
||||
logger.info("AC msgMonitor successfully,msgMonitor sessionId is ===>" + acCheckMsgMonitorResBody.getSessionId());
|
||||
monitorSessionCookie = acCheckMsgMonitorResBody.getSessionId();
|
||||
}
|
||||
} else if (statusCode == 0) {
|
||||
@@ -308,21 +314,22 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
return configInfo;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 往zx发送文件数据的消息,即发送文件的message数据(结构化数据)
|
||||
* 本来是作为文件消息发送,现该方法主要用于单条发送数据-20191224
|
||||
*
|
||||
* 发送消息控制器
|
||||
* @param transaction
|
||||
* @param contents
|
||||
*/
|
||||
private void sendMsgLog(Transaction transaction, List<String> contents) {
|
||||
|
||||
private void sendMsgController(Transaction transaction, List<String> contents) {
|
||||
try {
|
||||
AvroMonitorTimerTask.addTotalNum(contents.size());
|
||||
AvroMonitorTimerTask.addReadyPostNum(contents.size());
|
||||
/**
|
||||
* 获取状态回传sessionID,检查认证是否存在
|
||||
*/
|
||||
if (StringUtils.isBlank(monitorSessionCookie)
|
||||
|| StringUtils.isBlank(msgSessionCookie)) {
|
||||
|| StringUtils.isBlank(msgSessionCookie)) {
|
||||
getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie
|
||||
getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证
|
||||
if (!checkTimerStart) {
|
||||
@@ -338,9 +345,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
logger.warn("MonitorMsg Timer is started......");
|
||||
}
|
||||
|
||||
/**
|
||||
* 开启定时扫描重定向数据集合
|
||||
*/
|
||||
|
||||
if (!redirectContentsPostStart) {
|
||||
postRedirectDataEveryMin();
|
||||
redirectContentsPostStart = true;
|
||||
@@ -349,8 +354,147 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
}
|
||||
|
||||
|
||||
if (isSingle(topicName)) {
|
||||
|
||||
for (String content : contents) {
|
||||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content,
|
||||
userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0
|
||||
} catch (Exception e) {
|
||||
logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
} else {
|
||||
xTag = HttpClientUtil.getXTAG(contents.get(0), topicName);
|
||||
dataJson = contents.get(0);
|
||||
int size = contents.size() / 100;
|
||||
if (size > 0) {
|
||||
for (int i = 0; i < size; i++) {
|
||||
// byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents.subList(i*1, (i+1)*1));
|
||||
//concurrentLinkedQueue.add(msgResults);
|
||||
pool.execute(new Producer(contents.subList(i * 100, (i + 1) * 100)));
|
||||
}
|
||||
if (contents.size() % 100 > 0) {
|
||||
pool.execute(new Producer(contents.subList(size, contents.size())));
|
||||
}
|
||||
} else {
|
||||
pool.execute(new Producer(contents));
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
|
||||
transaction.commit();
|
||||
} finally {
|
||||
if (transaction != null) {
|
||||
transaction.commit();
|
||||
}
|
||||
AvroMonitorTimerTask.subReadyPostNum(contents.size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
class Producer implements Runnable {
|
||||
|
||||
private List<String> contents;
|
||||
public Producer(List<String> contents) { ;
|
||||
this.contents = contents;
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.debug("Current Producer Thread number ID :" + Thread.currentThread().getId());
|
||||
byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents);
|
||||
concurrentLinkedQueue.add(msgResults);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class Consumer implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
logger.debug("Current Consumer Thread number ID :" + Thread.currentThread().getId());
|
||||
while(true) {
|
||||
|
||||
if (concurrentLinkedQueue.isEmpty()) {
|
||||
logger.info("当前队列无数据,等待数据接入!");
|
||||
} else {
|
||||
byte[] result = concurrentLinkedQueue.poll();
|
||||
HttpClientUtil.asyncProducerAvroMessageToZX_toBatch(postMsgUrl, topicName, result, dataJson,
|
||||
userAgent, msgSessionCookie, 100, System.currentTimeMillis());//初始发送count计数为0
|
||||
logger.info("生产数据,等待数据接入!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 往zx发送文件数据的消息,即发送文件的message数据(结构化数据)
|
||||
* 本来是作为文件消息发送,现该方法主要用于单条发送数据-20191224
|
||||
*
|
||||
* @param transaction
|
||||
* @param contents
|
||||
*/
|
||||
private void sendMsgLog(Transaction transaction, List<String> contents) {
|
||||
|
||||
try {
|
||||
|
||||
logger.info("Current sendMsgLog Thread number ID :" + Thread.currentThread().getId());
|
||||
|
||||
/**
|
||||
* 获取状态回传sessionID,检查认证是否存在
|
||||
*/
|
||||
if (StringUtils.isBlank(monitorSessionCookie)
|
||||
|| StringUtils.isBlank(msgSessionCookie)) {
|
||||
getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie
|
||||
getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证
|
||||
if (!checkTimerStart) {
|
||||
checkCookieEveryWeek();//sendMsgLog-第一次启动检测到monitorSessionCookie为空时启动任务但不进行验证,后续间隔一段时间后开始验证,每次申请monitorSessionCookie和msgSessionCookie两个Cookie
|
||||
checkTimerStart = true;
|
||||
logger.debug("CheckMsgAndFileCookie Timer is started......");
|
||||
}
|
||||
|
||||
if (!monitorStart) {//消息定时上报
|
||||
AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, postMsgUrl,
|
||||
"monitor-msg", 1, userAgent, topicName);//sendMsgLog-日志消息
|
||||
monitorStart = true;
|
||||
logger.debug("MonitorMsg Timer is started......");
|
||||
}
|
||||
|
||||
|
||||
if (!redirectContentsPostStart) {
|
||||
postRedirectDataEveryMin();
|
||||
redirectContentsPostStart = true;
|
||||
logger.debug("RedirectContents Timer Post is started......");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
AvroMonitorTimerTask.addTotalNum(contents.size());
|
||||
logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie);
|
||||
logger.debug("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie);
|
||||
AvroMonitorTimerTask.addReadyPostNum(contents.size());
|
||||
if (isSingle(topicName)) {
|
||||
for (String content : contents) {
|
||||
@@ -371,28 +515,56 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
|
||||
} else {
|
||||
long beginAvroTime = System.currentTimeMillis();
|
||||
byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents);
|
||||
logger.info("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) );
|
||||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
HttpClientUtil.asyncProducerAvroMessageToZX(postMsgUrl, topicName, msgResults, contents.get(0),
|
||||
userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0
|
||||
} catch (Exception e) {
|
||||
logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
|
||||
logger.debug("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) );
|
||||
HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0),
|
||||
userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());
|
||||
/* for (String content : contents) {
|
||||
HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0),
|
||||
userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0
|
||||
}*/
|
||||
|
||||
|
||||
/*for (String content : contents) {
|
||||
|
||||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0),
|
||||
userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());
|
||||
} catch (Exception e) {
|
||||
logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}*/
|
||||
|
||||
/* for (String content : contents) {
|
||||
|
||||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SinkService.getInstance().producerAvroMessageToBus(postMsgUrl, topicName, msgResults, contents.get(0),
|
||||
userAgent, msgSessionCookie, contents.size());
|
||||
} catch (Exception e) {
|
||||
logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
|
||||
}
|
||||
}
|
||||
});
|
||||
}*/
|
||||
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
|
||||
transaction.commit();
|
||||
} finally {
|
||||
if (transaction != null) {
|
||||
logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
|
||||
transaction.commit();
|
||||
}
|
||||
AvroMonitorTimerTask.subReadyPostNum(contents.size());
|
||||
} finally {
|
||||
if (transaction != null) {
|
||||
transaction.commit();
|
||||
}
|
||||
AvroMonitorTimerTask.subReadyPostNum(contents.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user