diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java new file mode 100644 index 0000000..f072cf6 --- /dev/null +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java @@ -0,0 +1,79 @@ +package cn.ac.iie.cusflume.sink.CommonUtils; + +import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientFactory; +import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil; +import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncPostMsgCallBack; +import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils; +import com.alibaba.fastjson.JSON; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; +import org.mortbay.log.Log; +import scala.util.Try; + +import java.io.IOException; +import java.net.MalformedURLException; + +public class HttpClientTest { + + private static CloseableHttpClient client = null; + private static Logger LOG = Logger.getLogger(HttpClientTest.class); + static { + //连接池对象 + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + //将最大连接数增加到200 + connectionManager.setMaxTotal(40); + //将每个路由的默认最大连接数增加到20 + connectionManager.setDefaultMaxPerRoute(40); + //HttpClient对象 + client = HttpClients.custom().setConnectionManager(connectionManager).build(); + } + + + public static void producerAvroMessageToZX (String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { + try { + HttpPost httpPost = new HttpPost(urlProducer); + //2020-9-9 新加头部 + httpPost.setHeader("Connection", "Keep-Alive"); + httpPost.setHeader("Content-type", "application/avro+json;charset=UTF-8"); + httpPost.setHeader(HttpHeaders.USER_AGENT, userAgent); + httpPost.addHeader("X-Tag", HttpClientUtil.getXTAG(dataJson, topic)); + httpPost.setHeader("Cookie", msgSessionCookie); + String md5Avro = MD5Utils.md5Encode(results); + httpPost.addHeader("Checksum", md5Avro); + httpPost.setEntity(new ByteArrayEntity(results)); + + ResponseHandler responseHandler = new ResponseHandler() { + @Override + public String handleResponse(HttpResponse response) throws ClientProtocolException, IOException { + return EntityUtils.toString(response.getEntity()); + } + }; + RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(10000) + .setConnectTimeout(10000) + .setSocketTimeout(10000) + .build(); + httpPost.setConfig(requestConfig); + long startTime = System.currentTimeMillis(); + String body = client.execute(httpPost, responseHandler); // 线程可能会在这里被阻塞 + Log.warn(Thread.currentThread().getName() + "用时" + (System.currentTimeMillis() - startTime)); + + } catch (Exception e) { + e.printStackTrace(); + } + + + } +} diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java new file mode 100644 index 0000000..acb69dd --- /dev/null +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java @@ -0,0 +1,232 @@ +package cn.ac.iie.cusflume.sink.CommonUtils; + +import cn.ac.iie.cusflume.sink.YbHttpAvroSinkFile; +import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask; +import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody; +import cn.ac.iie.cusflume.sink.daoUtils.RealtimeCountConfig; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang.StringUtils; +import org.apache.http.*; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; +import org.mortbay.log.Log; + +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.UnknownHostException; + +public class SinkHttpClientUtil { + private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class); + /** 全局连接池对象 */ + private static final PoolingHttpClientConnectionManager connManager = + new PoolingHttpClientConnectionManager(); + public static final String DEFAULT_CHARSET = "utf-8"; + private static int socketTimeout = RealtimeCountConfig.HTTP_ASYNC_SOCKETTIMEOUT;//设置等待数据超时时间60秒钟 根据业务调整 + private static int connectTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTTIMEOUT;//连接超时 + private static int poolSize = RealtimeCountConfig.HTTP_ASYNC_POOLSIZE;//连接池最大连接数 + private static int maxPerRoute = RealtimeCountConfig.HTTP_ASYNC_MAXPERROUTE;//每个主机的并发最多只有1500 + private static int connectionRequestTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTIONREQUESTTIMEOUT; //从连接池中后去连接的timeout时间 + + + static { + connManager.setMaxTotal(poolSize); + connManager.setDefaultMaxPerRoute(maxPerRoute); + } + + public static CloseableHttpClient getHttpClient() { + RequestConfig requestConfig = RequestConfig.custom() + // 获取连接超时时间 + .setConnectionRequestTimeout(connectionRequestTimeout) + // 请求超时时间 + .setConnectTimeout(connectTimeout) + // 响应超时时间 + .setSocketTimeout(socketTimeout) + .build(); + + + /** + * 测出超时重试机制为了防止超时不生效而设置 + * 如果直接放回false,不重试 + * 这里会根据情况进行判断是否重试 + */ + HttpRequestRetryHandler retry = (exception, executionCount, context) -> { + if (executionCount >= 3) {// 如果已经重试了3次,就放弃 + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + if (!(request instanceof HttpEntityEnclosingRequest)) { + return true; + } + return false; + }; + + + ConnectionKeepAliveStrategy myStrategy = (response, context) -> { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && param.equalsIgnoreCase("timeout")) { + return Long.parseLong(value) * 1000; + } + } + return 60 * 1000;//如果没有约定,则默认定义时长为60s + }; + + // 创建httpClient + return HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(requestConfig) + // 把请求重试设置到连接客户端 + .setRetryHandler(retry) + .setKeepAliveStrategy(myStrategy) + // 配置连接池管理对象 + .setConnectionManager(connManager) + .build(); + + } + + + public static void httpPost(String url, byte[] requestBody, int batchSize, Header... headers) { + String msg = "-1"; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + HttpPost httpPost = new HttpPost(url); + httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); + httpPost.setHeader("Content-Type", "application/json"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPost.addHeader(h); + } + } + ByteArrayEntity payload = new ByteArrayEntity(requestBody); + payload.setContentEncoding("utf-8"); + httpPost.setEntity(payload); + long startTime = System.currentTimeMillis(); + response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime)); + if (statusCode != HttpStatus.SC_OK ) { + AvroMonitorTimerTask.addSuccessNum(batchSize); + LOG.info("数据总线响应内容:" + msg); + } else { + AvroMonitorTimerTask.addFailedNum(batchSize); + LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg); + + switch (statusCode) { + case 300: + LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); + //若不包含对应字段,则不进行对象转换,减少报错 + if (msg.contains("redirect")) { + ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class); + String redirectUrl = resRedirBody.getData().getRedirect(); + if (StringUtils.isNotBlank(redirectUrl)) { + YbHttpAvroSinkFile.changeUrl(redirectUrl); + } + } else { + LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!"); + } + + + break; + case 301: + LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); + break; + case 410: + LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + + "服务端响应时间(ms)==>" + "<==," + + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); + YbHttpAvroSinkFile.updateCookie(); + break; + case 500: + LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + + "服务端响应时间(ms)==>" + "<==," + + ",resRedirBodyCode:500,处理请求过程出现系统错误."); + YbHttpAvroSinkFile.updateCookie(); + break; + default: + LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==," + + "---Status Code:" + statusCode ); + break; + } + + } + + + + } catch (ClientProtocolException e) { + LOG.error("协议错误: {}", e); + } catch (ParseException e) { + LOG.error("解析错误: {}", e); + } catch (IOException e) { + LOG.error("IO错误: {}", e); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("释放链接错误: {}", e); + } + } + } + + } + + + + + + + + + +} diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java index 4a53c04..6ed6e3f 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java @@ -167,8 +167,9 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback" + postResBody); if (StringUtils.isNotBlank(postResBody)) { this.postFileResBody = JSONObject.parseObject(postResBody, PostFileResBody.class); @@ -185,7 +186,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback" + proResBody.toString()); } else { LOG.warn("AsyncHttpClientPostFileCallback post file success but postResBody(response body) is null."); - } + }*/ } else if (statusCode == 403) {//空文件,不再重试发送 LOG.error("AsyncHttpClientPostFileCallback post zxFile statusCode is 403 so get the fileIs but this minio file is empty.This message is===>>>" + sendMsg + "<<<==="); } else { @@ -196,7 +197,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback 1) { LOG.warn("AsyncHttpClientPostFileCallback post zxFile is failed,retry count=" + count); } - if (count > 4) { + /* if (count > 4) { LOG.error("AsyncHttpClientPostFileCallback post zxFile is failed and already retry 3 times.error===>>>" + e + "<<<===.This message is===>>>" + sendMsg + "<<<==="); } else { if (configInfo != null && StringUtils.isNotBlank(sendMsg) && resultIs != null) { @@ -236,7 +237,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback>>" + e2); } @@ -253,7 +254,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback 1) { LOG.warn("AsyncHttpClientPostFileCallback post zxFile statusCode is abnormal,retry count=" + count); } - if (count > 4) { + /* if (count > 4) { LOG.error("AsyncHttpClientPostFileCallback post zxFile statusCode is abnormal and already retry 3 times.statusCode is{" + statusCode + "}.This message is===>>>" + sendMsg + "<<<==="); } else { if (configInfo != null && StringUtils.isNotBlank(sendMsg) && resultIs != null) { @@ -262,7 +263,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback>>" + e2); } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java index d47c4c2..aa70d8a 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java @@ -131,7 +131,7 @@ public class HttpAsyncClient { .register("https", new SSLIOSessionStrategy(sslcontext, NoopHostnameVerifier.INSTANCE)) .build(); - // 配置io线程 + // 配置io线程\ IOReactorConfig ioReactorConfig = IOReactorConfig.custom() .setSoKeepAlive(true) .setTcpNoDelay(true) @@ -185,7 +185,7 @@ public class HttpAsyncClient { return Long.parseLong(value) * 1000; } } - return 10 * 1000;//如果没有约定,则默认定义时长为10s + return 60 * 1000;//如果没有约定,则默认定义时长为10s } }; diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java index 9620be9..9fb923e 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java @@ -2,6 +2,7 @@ package cn.ac.iie.cusflume.sink.HttpAsyncUtils; import cn.ac.iie.cusflume.sink.CommonUtils.GetDataDictionaryCodeByTopicUtils; import cn.ac.iie.cusflume.sink.CommonUtils.GetFilePathByTopicUtils; +import cn.ac.iie.cusflume.sink.CommonUtils.HttpClientTest; import cn.ac.iie.cusflume.sink.HttpAsyncUtils.mail.AsyncPostMailFilesCallback; import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncPostMsgCallBack; import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask; @@ -78,18 +79,18 @@ public class HttpClientUtil { HttpPost httpPost = new HttpPost(baseUrl); //Parameters - LOG.warn("==== Parameters ======" + list); + LOG.info("==== Parameters ======" + list); CloseableHttpResponse response = null; try { httpPost.setEntity(new UrlEncodedFormEntity(list)); // httpPost.setHeader("Connection","close"); response = httpClient.execute(httpPost); - LOG.warn("========HttpResponseProxy:========" + response.getStatusLine()); + LOG.info("========HttpResponseProxy:========" + response.getStatusLine()); HttpEntity entity = response.getEntity(); String result = null; if (entity != null) { result = EntityUtils.toString(entity, "UTF-8"); - LOG.warn("========Response=======" + result); + LOG.info("========Response=======" + result); } EntityUtils.consume(entity); return result; @@ -814,10 +815,56 @@ public class HttpClientUtil { } + public static void asyncProducerAvroMessageToZX(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) throws IOException { + HttpClientTest.producerAvroMessageToZX(urlProducer, topic, results, dataJson, userAgent, msgSessionCookie, count, postTime); + } + + + public static void asyncProducerAvroMessageToZX_toBatch(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { + HttpPost httpPost = null; + urlProducer = urlProducer.trim(); + CloseableHttpAsyncClient httpClient = null; + try { + httpClient = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(); + httpClient.start(); + httpPost = new HttpPost(urlProducer); + httpPost.addHeader("User-Agent", userAgent); + httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); + httpPost.addHeader("Cookie", msgSessionCookie); + String md5Avro = MD5Utils.md5Encode(results); + httpPost.addHeader("Checksum", md5Avro); + logger.info("批量发送body Checksum MD5 为:" + md5Avro); + httpPost.addHeader("Content-Type", "binary/octet-stream"); + httpPost.addHeader("X-Tag", "getXTAG(dataJson, topic)"); + ByteArrayEntity payload = new ByteArrayEntity(results); + payload.setContentEncoding("utf-8"); + httpPost.setEntity(payload); + AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime); + httpClient.execute(httpPost, asyncPostMsgCallBack); + logger.info("当前Thread number ID :" + Thread.currentThread().getId()); + } catch (MalformedURLException e) { + //执行URL url = new URL()的异常 + e.printStackTrace(); + } catch (ClientProtocolException e) { + // 执行httpClient.execute(httpGet)的异常 + e.printStackTrace(); + } catch (IOException e) { + // 执行httpClient.execute(httpGet)的异常 + logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志 + e.printStackTrace(); + } catch (Exception e) { + //handle response here... try other servers + logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志 + e.printStackTrace(); + } + + } + + /** * 批量生产消息-总线 */ - public static void asyncProducerAvroMessageToZX(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { + public static void asyncProducerAvroMessageToZX_bk(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { HttpPost httpPost = null; urlProducer = urlProducer.trim(); CloseableHttpAsyncClient httpClient = null; @@ -838,6 +885,7 @@ public class HttpClientUtil { httpPost.setEntity(payload); AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime); httpClient.execute(httpPost, asyncPostMsgCallBack); + logger.info("当前Thread number ID :" + Thread.currentThread().getId()); } catch (MalformedURLException e) { //执行URL url = new URL()的异常 e.printStackTrace(); @@ -1169,7 +1217,7 @@ public class HttpClientUtil { * @param dataJson * @return */ - private static String getXTAG(String dataJson, String topic) { + public static String getXTAG(String dataJson, String topic) { if ("monitor-msg".equals(topic) || "INFLUX-SAPP-BPS-STAT-LOG".equals(topic)) { return RealtimeCountConfig.MONITOR_NOFILE_MSG_X_TAG; } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java index af7670c..810e668 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java @@ -105,7 +105,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { public void completed(HttpResponse response) { // ProResBody proResBody = null; try { - int statuCode = response.getStatusLine().getStatusCode(); + int statusCode = response.getStatusLine().getStatusCode(); HttpEntity entity = response.getEntity(); String ret = null; if (entity != null) { @@ -129,12 +129,12 @@ public class AsyncPostMsgCallBack implements FutureCallback { /** * 20200818-接口细化响应码 */ - if (statuCode == 200 && resRedirBodyCode == 200) { - logger.info("数据加载成功,返回码: " + statuCode); + if (statusCode == 200 && resRedirBodyCode == 200) { + logger.info("数据加载成功,返回码: " + statusCode); logger.debug("生产数据==>" + dataJson + "<==," + "生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + - "数据加载成功,返回码: " + statuCode); + "数据加载成功,返回码: " + statusCode); AvroMonitorTimerTask.addSuccessNum(count); @@ -144,7 +144,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { case 300: logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," - + "<==,statuCode:" + statuCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); + + "<==,Status Code:" + statusCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); //若不包含对应字段,则不进行对象转换,减少报错 if (ret.contains("redirect")) { ResRedirBody resRedirBody = JSONObject.parseObject(ret, ResRedirBody.class); @@ -159,19 +159,19 @@ public class AsyncPostMsgCallBack implements FutureCallback { YbHttpAvroSinkFile.redirectContents.add(dataJson); break; case 301: - logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); YbHttpAvroSinkFile.redirectContents.add(dataJson); break; case 410: - logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); YbHttpAvroSinkFile.updateCookie(); break; case 500: - logger.info("AsyncPostMsgCallBack==>statuCode:" + statuCode + logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + ",resRedirBodyCode:500,处理请求过程出现系统错误."); YbHttpAvroSinkFile.updateCookie(); @@ -179,7 +179,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { default: logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," - + "---statuCode:" + statuCode + "---resRedirBodyCode:" + resRedirBodyCode); + + "---Status Code:" + statusCode + "---resRedirBodyCode:" + resRedirBodyCode); break; } } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java new file mode 100644 index 0000000..e550f3f --- /dev/null +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java @@ -0,0 +1,139 @@ +package cn.ac.iie.cusflume.sink; + +import cn.ac.iie.cusflume.sink.CommonUtils.SinkHttpClientUtil; +import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask; +import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils; +import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.ParseException; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; +import org.mortbay.log.Log; + +import java.io.IOException; + +public class SinkService { + private static final SinkService sinkService = new SinkService(); + + private SinkService() { + } + //SinkHttpClientUtil.getHttpClient(); + public static CloseableHttpClient httpClient = null; + private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class); + + public static SinkService getInstance() { + return sinkService; + } + + public void producerAvroMessageToBus( + String urlProducer, String topic, byte[] results, + String dataJson, String userAgent, String msgSessionCookie, int batchSize) { + + httpClient = SinkHttpClientUtil.getHttpClient(); + HttpPost httpPost = null; + urlProducer = urlProducer.trim(); + String msg = "-1"; + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + httpPost = new HttpPost(urlProducer); + httpPost.addHeader("User-Agent", userAgent); + httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); + httpPost.addHeader("Cookie", msgSessionCookie); + String md5Avro = MD5Utils.md5Encode(results); + httpPost.addHeader("Checksum", md5Avro); + LOG.info("批量发送body Checksum MD5 为:" + md5Avro); + httpPost.addHeader("Content-Type", "binary/octet-stream"); + httpPost.addHeader("X-Tag", cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil.getXTAG(dataJson, topic)); + ByteArrayEntity payload = new ByteArrayEntity(results); + payload.setContentEncoding("utf-8"); + httpPost.setEntity(payload); + long startTime = System.currentTimeMillis(); + response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime)); + if (statusCode != HttpStatus.SC_OK) { + AvroMonitorTimerTask.addSuccessNum(batchSize); + LOG.info("数据总线响应内容:" + msg); + } else { + AvroMonitorTimerTask.addFailedNum(batchSize); + LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg); + + switch (statusCode) { + case 300: + LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); + //若不包含对应字段,则不进行对象转换,减少报错 + if (msg.contains("redirect")) { + ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class); + String redirectUrl = resRedirBody.getData().getRedirect(); + if (StringUtils.isNotBlank(redirectUrl)) { + YbHttpAvroSinkFile.changeUrl(redirectUrl); + } + } else { + LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!"); + } + + + break; + case 301: + LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); + break; + case 410: + LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + + "服务端响应时间(ms)==>" + "<==," + + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); + YbHttpAvroSinkFile.updateCookie(); + break; + case 500: + LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + + "服务端响应时间(ms)==>" + "<==," + + ",resRedirBodyCode:500,处理请求过程出现系统错误."); + YbHttpAvroSinkFile.updateCookie(); + break; + default: + LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==," + + "---Status Code:" + statusCode); + break; + } + + } + + + } catch (ClientProtocolException e) { + LOG.error("协议错误: {}", e); + } catch (ParseException e) { + LOG.error("解析错误: {}", e); + } catch (IOException e) { + LOG.error("IO错误: {}", e); + } catch (Exception e) { + LOG.error("其它错误: {}", e); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("释放链接错误: {}", e); + } + } + } + + } + +} + diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java index 808f8cd..2ca287e 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java @@ -16,6 +16,7 @@ import org.apache.flume.sink.AbstractSink; import org.apache.log4j.Logger; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -25,7 +26,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { private static Logger logger = Logger.getLogger(YbHttpAvroSinkFile.class); protected static ExecutorService pool = Executors.newFixedThreadPool(RealtimeCountConfig.HTTP_ASYNC_PARALLELISM); - + private static ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(); private static DataCenterLoad dcl; private static String postMsgUrl;//发送消息路径,配置文件获取,发送文件与发送消息皆需要 @@ -42,6 +43,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { private static String userAgent;//业务系统编码systemId,该字段为系统的唯一编码,配置文件获取 private static String xTag;//标签编码tag,在总线中唯一标识该标签,配置文件获取--20191217笔记--貌似现在已经不需要这个参数作为头部了 + private static String dataJson; private static String msgSessionCookie;//消息会话标识,由响应返回 private static String fileSessionCookie;//文件会话标识,由响应返回,仅发送文件时需要---若只发送消息,则fileSessionCookie会一直为空-即仅广东需要 @@ -94,6 +96,10 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { logger.error("Starting YbHttpAvroSinkFile is error==>checkMsgUrl and postMsgUrl can not be null!!!!"); } + logger.warn("启动Sink File 执行程序 =============="); + //new Thread(new Consumer()).start(); + logger.warn("开启多线程消费队列数据=================="); + logger.warn("Starting YbHttpAvroSinkFile ... ..."); } @@ -172,10 +178,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); - Transaction transaction = null; -// AcResBody acCheckResBody = null; -// ProResBody producerResBody = null; + Transaction transaction = null; try { + logger.debug("Current Process Thread number ID :" + Thread.currentThread().getId()); transaction = channel.getTransaction(); transaction.begin(); Event event = null; @@ -214,6 +219,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { */ case "INFLUX-SAPP-BPS-STAT-LOG"://读取回写的influxDB合计数据用作状态上传 sendMsgLog(transaction, contents);//20191209移除文件发送,仅处理消息 + //sendMsgController(transaction, contents); break; default: logger.error("YbHttpAvroSinkFile can't find this topic:" + topicName + ".Please confirm this topicName is correct!!!"); @@ -280,8 +286,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } if (statusCode == 200) { if (StringUtils.isNotBlank(acCheckMsgMonitorResBody.getSessionId())) { - logger.warn("getMonitorSessionCookie-Thread.currentThread().getName()===>" + Thread.currentThread().getName()); - logger.warn("AC msgMonitor successfully,msgMonitor sessionId is ===>" + acCheckMsgMonitorResBody.getSessionId()); + logger.info("getMonitorSessionCookie-Thread.currentThread().getName()===>" + Thread.currentThread().getName()); + logger.info("AC msgMonitor successfully,msgMonitor sessionId is ===>" + acCheckMsgMonitorResBody.getSessionId()); monitorSessionCookie = acCheckMsgMonitorResBody.getSessionId(); } } else if (statusCode == 0) { @@ -308,21 +314,22 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { return configInfo; } + + /** - * 往zx发送文件数据的消息,即发送文件的message数据(结构化数据) - * 本来是作为文件消息发送,现该方法主要用于单条发送数据-20191224 - * + * 发送消息控制器 * @param transaction * @param contents */ - private void sendMsgLog(Transaction transaction, List contents) { - + private void sendMsgController(Transaction transaction, List contents) { try { + AvroMonitorTimerTask.addTotalNum(contents.size()); + AvroMonitorTimerTask.addReadyPostNum(contents.size()); /** * 获取状态回传sessionID,检查认证是否存在 */ if (StringUtils.isBlank(monitorSessionCookie) - || StringUtils.isBlank(msgSessionCookie)) { + || StringUtils.isBlank(msgSessionCookie)) { getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证 if (!checkTimerStart) { @@ -338,9 +345,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { logger.warn("MonitorMsg Timer is started......"); } - /** - * 开启定时扫描重定向数据集合 - */ + if (!redirectContentsPostStart) { postRedirectDataEveryMin(); redirectContentsPostStart = true; @@ -349,8 +354,147 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } + if (isSingle(topicName)) { + + for (String content : contents) { + pool.execute(new Runnable() { + @Override + public void run() { + try { + + HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, + userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0 + } catch (Exception e) { + logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); + } + } + }); + } + + } else { + xTag = HttpClientUtil.getXTAG(contents.get(0), topicName); + dataJson = contents.get(0); + int size = contents.size() / 100; + if (size > 0) { + for (int i = 0; i < size; i++) { + // byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents.subList(i*1, (i+1)*1)); + //concurrentLinkedQueue.add(msgResults); + pool.execute(new Producer(contents.subList(i * 100, (i + 1) * 100))); + } + if (contents.size() % 100 > 0) { + pool.execute(new Producer(contents.subList(size, contents.size()))); + } + } else { + pool.execute(new Producer(contents)); + } + } + + } catch (Exception e) { + logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); + transaction.commit(); + } finally { + if (transaction != null) { + transaction.commit(); + } + AvroMonitorTimerTask.subReadyPostNum(contents.size()); + } + + + + + + } + + + + + class Producer implements Runnable { + + private List contents; + public Producer(List contents) { ; + this.contents = contents; + } + @Override + public void run() { + try { + logger.debug("Current Producer Thread number ID :" + Thread.currentThread().getId()); + byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); + concurrentLinkedQueue.add(msgResults); + + } catch (Exception e) { + logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); + + } + + + } + } + + + class Consumer implements Runnable { + + @Override + public void run() { + logger.debug("Current Consumer Thread number ID :" + Thread.currentThread().getId()); + while(true) { + + if (concurrentLinkedQueue.isEmpty()) { + logger.info("当前队列无数据,等待数据接入!"); + } else { + byte[] result = concurrentLinkedQueue.poll(); + HttpClientUtil.asyncProducerAvroMessageToZX_toBatch(postMsgUrl, topicName, result, dataJson, + userAgent, msgSessionCookie, 100, System.currentTimeMillis());//初始发送count计数为0 + logger.info("生产数据,等待数据接入!"); + } + } + } + } + + + /** + * 往zx发送文件数据的消息,即发送文件的message数据(结构化数据) + * 本来是作为文件消息发送,现该方法主要用于单条发送数据-20191224 + * + * @param transaction + * @param contents + */ + private void sendMsgLog(Transaction transaction, List contents) { + + try { + + logger.info("Current sendMsgLog Thread number ID :" + Thread.currentThread().getId()); + + /** + * 获取状态回传sessionID,检查认证是否存在 + */ + if (StringUtils.isBlank(monitorSessionCookie) + || StringUtils.isBlank(msgSessionCookie)) { + getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie + getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证 + if (!checkTimerStart) { + checkCookieEveryWeek();//sendMsgLog-第一次启动检测到monitorSessionCookie为空时启动任务但不进行验证,后续间隔一段时间后开始验证,每次申请monitorSessionCookie和msgSessionCookie两个Cookie + checkTimerStart = true; + logger.debug("CheckMsgAndFileCookie Timer is started......"); + } + + if (!monitorStart) {//消息定时上报 + AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, postMsgUrl, + "monitor-msg", 1, userAgent, topicName);//sendMsgLog-日志消息 + monitorStart = true; + logger.debug("MonitorMsg Timer is started......"); + } + + + if (!redirectContentsPostStart) { + postRedirectDataEveryMin(); + redirectContentsPostStart = true; + logger.debug("RedirectContents Timer Post is started......"); + } + } + + AvroMonitorTimerTask.addTotalNum(contents.size()); - logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); + logger.debug("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); AvroMonitorTimerTask.addReadyPostNum(contents.size()); if (isSingle(topicName)) { for (String content : contents) { @@ -371,28 +515,56 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } else { long beginAvroTime = System.currentTimeMillis(); byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); - logger.info("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) ); - pool.execute(new Runnable() { - @Override - public void run() { - try { - HttpClientUtil.asyncProducerAvroMessageToZX(postMsgUrl, topicName, msgResults, contents.get(0), - userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0 - } catch (Exception e) { - logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); + logger.debug("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) ); + HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0), + userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis()); + /* for (String content : contents) { + HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0), + userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0 + }*/ + + + /*for (String content : contents) { + + pool.execute(new Runnable() { + @Override + public void run() { + try { + + HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0), + userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis()); + } catch (Exception e) { + logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); + } } - } - }); + }); + }*/ + + /* for (String content : contents) { + + pool.execute(new Runnable() { + @Override + public void run() { + try { + SinkService.getInstance().producerAvroMessageToBus(postMsgUrl, topicName, msgResults, contents.get(0), + userAgent, msgSessionCookie, contents.size()); + } catch (Exception e) { + logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); + } + } + }); + }*/ + } } catch (Exception e) { - logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); - transaction.commit(); - } finally { - if (transaction != null) { + logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); transaction.commit(); - } - AvroMonitorTimerTask.subReadyPostNum(contents.size()); + } finally { + if (transaction != null) { + transaction.commit(); + } + AvroMonitorTimerTask.subReadyPostNum(contents.size()); } }