diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java index f072cf6..9d8523f 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java @@ -1,30 +1,22 @@ package cn.ac.iie.cusflume.sink.CommonUtils; -import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientFactory; import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil; -import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncPostMsgCallBack; import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils; -import com.alibaba.fastjson.JSON; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.ResponseHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.protocol.HTTP; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; import org.mortbay.log.Log; -import scala.util.Try; import java.io.IOException; -import java.net.MalformedURLException; public class HttpClientTest { diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java deleted file mode 100644 index acb69dd..0000000 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java +++ /dev/null @@ -1,232 +0,0 @@ -package cn.ac.iie.cusflume.sink.CommonUtils; - -import cn.ac.iie.cusflume.sink.YbHttpAvroSinkFile; -import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask; -import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody; -import cn.ac.iie.cusflume.sink.daoUtils.RealtimeCountConfig; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.StringUtil; -import org.apache.commons.lang.StringUtils; -import org.apache.http.*; -import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.HttpRequestRetryHandler; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.conn.ConnectionKeepAliveStrategy; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.message.BasicHeaderElementIterator; -import org.apache.http.protocol.HTTP; -import org.apache.http.util.EntityUtils; -import org.apache.log4j.Logger; -import org.mortbay.log.Log; - -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLHandshakeException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.UnknownHostException; - -public class SinkHttpClientUtil { - private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class); - /** 全局连接池对象 */ - private static final PoolingHttpClientConnectionManager connManager = - new PoolingHttpClientConnectionManager(); - public static final String DEFAULT_CHARSET = "utf-8"; - private static int socketTimeout = RealtimeCountConfig.HTTP_ASYNC_SOCKETTIMEOUT;//设置等待数据超时时间60秒钟 根据业务调整 - private static int connectTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTTIMEOUT;//连接超时 - private static int poolSize = RealtimeCountConfig.HTTP_ASYNC_POOLSIZE;//连接池最大连接数 - private static int maxPerRoute = RealtimeCountConfig.HTTP_ASYNC_MAXPERROUTE;//每个主机的并发最多只有1500 - private static int connectionRequestTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTIONREQUESTTIMEOUT; //从连接池中后去连接的timeout时间 - - - static { - connManager.setMaxTotal(poolSize); - connManager.setDefaultMaxPerRoute(maxPerRoute); - } - - public static CloseableHttpClient getHttpClient() { - RequestConfig requestConfig = RequestConfig.custom() - // 获取连接超时时间 - .setConnectionRequestTimeout(connectionRequestTimeout) - // 请求超时时间 - .setConnectTimeout(connectTimeout) - // 响应超时时间 - .setSocketTimeout(socketTimeout) - .build(); - - - /** - * 测出超时重试机制为了防止超时不生效而设置 - * 如果直接放回false,不重试 - * 这里会根据情况进行判断是否重试 - */ - HttpRequestRetryHandler retry = (exception, executionCount, context) -> { - if (executionCount >= 3) {// 如果已经重试了3次,就放弃 - return false; - } - if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 - return true; - } - if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 - return false; - } - if (exception instanceof InterruptedIOException) {// 超时 - return true; - } - if (exception instanceof UnknownHostException) {// 目标服务器不可达 - return false; - } - if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 - return false; - } - if (exception instanceof SSLException) {// ssl握手异常 - return false; - } - HttpClientContext clientContext = HttpClientContext.adapt(context); - HttpRequest request = clientContext.getRequest(); - // 如果请求是幂等的,就再次尝试 - if (!(request instanceof HttpEntityEnclosingRequest)) { - return true; - } - return false; - }; - - - ConnectionKeepAliveStrategy myStrategy = (response, context) -> { - HeaderElementIterator it = new BasicHeaderElementIterator - (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); - while (it.hasNext()) { - HeaderElement he = it.nextElement(); - String param = he.getName(); - String value = he.getValue(); - if (value != null && param.equalsIgnoreCase("timeout")) { - return Long.parseLong(value) * 1000; - } - } - return 60 * 1000;//如果没有约定,则默认定义时长为60s - }; - - // 创建httpClient - return HttpClients.custom() - // 把请求相关的超时信息设置到连接客户端 - .setDefaultRequestConfig(requestConfig) - // 把请求重试设置到连接客户端 - .setRetryHandler(retry) - .setKeepAliveStrategy(myStrategy) - // 配置连接池管理对象 - .setConnectionManager(connManager) - .build(); - - } - - - public static void httpPost(String url, byte[] requestBody, int batchSize, Header... headers) { - String msg = "-1"; - // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(); - // 创建POST请求对象 - CloseableHttpResponse response = null; - try { - HttpPost httpPost = new HttpPost(url); - httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); - httpPost.setHeader("Content-Type", "application/json"); - if (StringUtil.isNotEmpty(headers)) { - for (Header h : headers) { - httpPost.addHeader(h); - } - } - ByteArrayEntity payload = new ByteArrayEntity(requestBody); - payload.setContentEncoding("utf-8"); - httpPost.setEntity(payload); - long startTime = System.currentTimeMillis(); - response = httpClient.execute(httpPost); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime)); - if (statusCode != HttpStatus.SC_OK ) { - AvroMonitorTimerTask.addSuccessNum(batchSize); - LOG.info("数据总线响应内容:" + msg); - } else { - AvroMonitorTimerTask.addFailedNum(batchSize); - LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg); - - switch (statusCode) { - case 300: - LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" - + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); - //若不包含对应字段,则不进行对象转换,减少报错 - if (msg.contains("redirect")) { - ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class); - String redirectUrl = resRedirBody.getData().getRedirect(); - if (StringUtils.isNotBlank(redirectUrl)) { - YbHttpAvroSinkFile.changeUrl(redirectUrl); - } - } else { - LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!"); - } - - - break; - case 301: - LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + - ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); - break; - case 410: - LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode - + "服务端响应时间(ms)==>" + "<==," - + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); - YbHttpAvroSinkFile.updateCookie(); - break; - case 500: - LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode - + "服务端响应时间(ms)==>" + "<==," - + ",resRedirBodyCode:500,处理请求过程出现系统错误."); - YbHttpAvroSinkFile.updateCookie(); - break; - default: - LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==," - + "---Status Code:" + statusCode ); - break; - } - - } - - - - } catch (ClientProtocolException e) { - LOG.error("协议错误: {}", e); - } catch (ParseException e) { - LOG.error("解析错误: {}", e); - } catch (IOException e) { - LOG.error("IO错误: {}", e); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("释放链接错误: {}", e); - } - } - } - - } - - - - - - - - - -} diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java index 944d5ed..66d7e88 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java @@ -52,7 +52,7 @@ public class AsyncHttpClientGetFileCallback implements FutureCallback schemaHashMap = new HashMap();//用于存放Schma + private static Logger logger = Logger.getLogger(HttpClientUtil.class); protected static ExecutorService pool = Executors.newFixedThreadPool(RealtimeCountConfig.HTTP_ASYNC_PARALLELISM);//线程池 + private final static BlockingQueue retryQueue = new ArrayBlockingQueue(100000); + private static String utf8Charset = "utf-8"; private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); - /** - * 向指定的url发送一次post请求,参数是List - * - * @param baseUrl 请求地址 - * @param list 请求参数,格式是List - * @return 返回结果, 请求失败时返回null - * @apiNote http接口处用 @RequestParam接收参数 - */ - public static String httpSyncPost(String baseUrl, List list) { - - CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient(); - HttpPost httpPost = new HttpPost(baseUrl); - - //Parameters - LOG.info("==== Parameters ======" + list); - CloseableHttpResponse response = null; - try { - httpPost.setEntity(new UrlEncodedFormEntity(list)); -// httpPost.setHeader("Connection","close"); - response = httpClient.execute(httpPost); - LOG.info("========HttpResponseProxy:========" + response.getStatusLine()); - HttpEntity entity = response.getEntity(); - String result = null; - if (entity != null) { - result = EntityUtils.toString(entity, "UTF-8"); - LOG.info("========Response=======" + result); - } - EntityUtils.consume(entity); - return result; - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (response != null) { - try { - response.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - return null; - } - - /** - * 向指定的url发送一次post请求,参数是字符串 - * - * @param baseUrl 请求地址 - * @param postString 请求参数,格式是json.toString() - * @return 返回结果, 请求失败时返回null - * @apiNote http接口处用 @RequestBody接收参数 - */ - public static String httpSyncPost(String baseUrl, String postString) { - - CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient(); - HttpPost httpPost = new HttpPost(baseUrl); - //parameters - LOG.warn("==== Parameters ======" + postString); - CloseableHttpResponse response = null; - try { - if (postString == null || "".equals(postString)) { - throw new Exception("missing post String"); - } - - StringEntity stringEntity = new StringEntity(postString.toString(), utf8Charset); - stringEntity.setContentEncoding("UTF-8"); - stringEntity.setContentType("application/json"); - httpPost.setEntity(stringEntity); - - response = httpClient.execute(httpPost); - LOG.warn("========HttpResponseProxy:========" + response.getStatusLine()); - HttpEntity entity = response.getEntity(); - String result = null; - if (entity != null) { - result = EntityUtils.toString(entity, "UTF-8"); - LOG.warn("========Response=======" + result); - } - EntityUtils.consume(entity); - return result; - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (response != null) { - try { - response.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - return null; - } - /** - * 向指定的url发送一次get请求,参数是List - * - * @param baseUrl 请求地址 - * @param list 请求参数,格式是List - * @return 返回结果, 请求失败时返回null - * @apiNote http接口处用 @RequestParam接收参数 - */ - public static String httpSyncGet(String baseUrl, List list) { - - CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient(); - HttpGet httpGet = new HttpGet(baseUrl); - //Parameters - LOG.warn("==== Parameters ======" + list); - CloseableHttpResponse response = null; - try { - - if (list != null) { - String getUrl = EntityUtils - .toString(new UrlEncodedFormEntity(list)); - httpGet.setURI(new URI(httpGet.getURI().toString() - + "?" + getUrl)); - } else { - httpGet.setURI(new URI(httpGet.getURI().toString())); - } - - response = httpClient.execute(httpGet); - LOG.warn("========HttpResponseProxy:========" + response.getStatusLine()); - HttpEntity entity = response.getEntity(); - String result = null; - if (entity != null) { - result = EntityUtils.toString(entity, "UTF-8"); - LOG.warn("========Response=======" + result); - } - EntityUtils.consume(entity); - return result; - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (response != null) { - try { - response.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - return null; - } - - /** - * 向指定的url发送一次get请求,参数是字符串 - * - * @param baseUrl 请求地址 - * @param urlParams 请求参数,格式是String - * @return 返回结果, 请求失败时返回null - * @apiNote http接口处用 @RequestParam接收参数 - */ - public static String httpSyncGet(String baseUrl, String urlParams) { - - CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient(); - HttpGet httpGet = new HttpGet(baseUrl); - //Parameters - LOG.warn("==== Parameters ======" + urlParams); - CloseableHttpResponse response = null; - try { - - if (null != urlParams || "".equals(urlParams)) { - - httpGet.setURI(new URI(httpGet.getURI().toString() - + "?" + urlParams)); - } else { - httpGet.setURI(new URI(httpGet.getURI().toString())); - } - - response = httpClient.execute(httpGet); - LOG.warn("========HttpResponseProxy:========" + response.getStatusLine()); - HttpEntity entity = response.getEntity(); - String result = null; - if (entity != null) { - result = EntityUtils.toString(entity, "UTF-8"); - LOG.warn("========Response=======" + result); - } - EntityUtils.consume(entity); - return result; - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (response != null) { - try { - response.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - return null; - } - - - /** - * 向指定的url发送一次get请求,参数是字符串 - * - * @param baseUrl 请求地址 - * @return 返回结果, 请求失败时返回null - * @apiNote http接口处用 @RequestParam接收参数 - */ - public static String httpSyncGet(String baseUrl) { - - CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient(); - HttpGet httpGet = new HttpGet(baseUrl); - - CloseableHttpResponse response = null; - try { - httpGet.setURI(new URI(httpGet.getURI().toString())); - response = httpClient.execute(httpGet); - LOG.warn("========HttpResponseProxy:========" + response.getStatusLine()); - HttpEntity entity = response.getEntity(); - String result = null; - if (entity != null) { - result = EntityUtils.toString(entity, "UTF-8"); - LOG.warn("========Response=======" + result); - } - EntityUtils.consume(entity); - return result; - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (response != null) { - try { - response.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - return null; - } - - - /** - * 向指定的url发送一次异步post请求,参数是字符串 - * - * @param baseUrl 请求地址 - * @param postString 请求参数,格式是json.toString() - * @param urlParams 请求参数,格式是String - * @param callback 回调方法,格式是FutureCallback - * @return 返回结果, 请求失败时返回null - * @apiNote http接口处用 @RequestParam接收参数 - */ - public static void httpAsyncPost(String baseUrl, String postString, - String urlParams, FutureCallback callback) throws Exception { - if (baseUrl == null || "".equals(baseUrl)) { - LOG.warn("we don't have base url, check config"); - throw new Exception("missing base url"); - } - CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool() - .getAsyncHttpClient(); - try { - hc.start(); - HttpPost httpPost = new HttpPost(baseUrl); - -// httpPost.setHeader("Connection","close"); - - if (null != postString) { - LOG.debug("exeAsyncReq post postBody={" + postString + "}"); - StringEntity entity = new StringEntity(postString.toString(), utf8Charset); - entity.setContentEncoding("UTF-8"); - entity.setContentType("application/json"); - httpPost.setEntity(entity); - } - - if (null != urlParams) { - - httpPost.setURI(new URI(httpPost.getURI().toString() - + "?" + urlParams)); - } - - LOG.warn("exeAsyncReq getparams:" + httpPost.getURI()); - - hc.execute(httpPost, callback); - - } catch (Exception e) { - e.printStackTrace(); - } - } - - - /** - * 向指定的url发送一次异步post请求,参数是字符串 - * - * @param baseUrl 请求地址 - * @param urlParams 请求参数,格式是List - * @param callback 回调方法,格式是FutureCallback - * @return 返回结果, 请求失败时返回null - * @apiNote http接口处用 @RequestParam接收参数 - */ - public static void httpAsyncPost(String baseUrl, List postBody, - List urlParams, FutureCallback callback) throws Exception { - if (baseUrl == null || "".equals(baseUrl)) { - LOG.warn("we don't have base url, check config"); - throw new Exception("missing base url"); - } - - try { - CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool() - .getAsyncHttpClient(); - - hc.start(); - - HttpPost httpPost = new HttpPost(baseUrl); - -// httpPost.setHeader("Connection","close"); - - if (null != postBody) { - LOG.debug("exeAsyncReq post postBody={" + postBody + "}"); - UrlEncodedFormEntity entity = new UrlEncodedFormEntity( - postBody, "UTF-8"); - httpPost.setEntity(entity); - } - - if (null != urlParams) { - - String getUrl = EntityUtils - .toString(new UrlEncodedFormEntity(urlParams)); - - httpPost.setURI(new URI(httpPost.getURI().toString() - + "?" + getUrl)); - } - - LOG.warn("exeAsyncReq getparams:" + httpPost.getURI()); - - - hc.execute(httpPost, callback); - - } catch (Exception e) { - e.printStackTrace(); - } + public static BlockingQueue getRetryQueue() { + return retryQueue; } /** @@ -409,7 +84,7 @@ public class HttpClientUtil { */ public static void httpAsyncPostFileToZx(String baseUrl, byte[] fileIs, AsyncHttpClientPostFileCallback callback) throws Exception { if (baseUrl == null || "".equals(baseUrl)) { - LOG.warn("we don't have base url, check config"); + logger.warn("we don't have base url, check config"); throw new Exception("missing base url"); } @@ -433,7 +108,7 @@ public class HttpClientUtil { if (null != fileIs) { httpPut.setEntity(new ByteArrayEntity(fileIs)); } - LOG.info("File httpPut.getURI()===>>>" + httpPut.getURI()); + logger.info("File httpPut.getURI()===>>>" + httpPut.getURI()); localContext.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore); @@ -454,7 +129,7 @@ public class HttpClientUtil { */ public static void httpAsyncPostFileToZx(String baseUrl, byte[] fileIs, AsyncPostMailFilesCallback callback) throws Exception { if (baseUrl == null || "".equals(baseUrl)) { - LOG.warn("we don't have base url, check config"); + logger.warn("we don't have base url, check config"); throw new Exception("missing base url"); } @@ -478,7 +153,7 @@ public class HttpClientUtil { if (null != fileIs) { httpPut.setEntity(new ByteArrayEntity(fileIs)); } - LOG.info("File httpPut.getURI()===>>>" + httpPut.getURI()); + logger.info("File httpPut.getURI()===>>>" + httpPut.getURI()); localContext.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore); @@ -503,7 +178,7 @@ public class HttpClientUtil { public static void httpAsyncGetFile(String baseUrl, FutureCallback callback) throws Exception { if (baseUrl == null || "".equals(baseUrl)) { - LOG.warn("we don't have base url, check config"); + logger.warn("we don't have base url, check config"); throw new Exception("missing base url"); } CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(); @@ -523,7 +198,7 @@ public class HttpClientUtil { // } // LOG.info("exeAsyncReq getparams:" + httpGet.getURI()); - LOG.info("File httpGet.getURI()===>>>" + httpGet.getURI()); + logger.info("File httpGet.getURI()===>>>" + httpGet.getURI()); hc.execute(httpGet, callback); } catch (Exception e) { e.printStackTrace(); @@ -545,7 +220,7 @@ public class HttpClientUtil { public static void httpAsyncGet(String baseUrl, String urlParams, FutureCallback callback) throws Exception { if (baseUrl == null || "".equals(baseUrl)) { - LOG.warn("we don't have base url, check config"); + logger.warn("we don't have base url, check config"); throw new Exception("missing base url"); } CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool() @@ -567,7 +242,7 @@ public class HttpClientUtil { httpGet.setURI(new URI(httpGet.getURI().toString())); } - LOG.warn("exeAsyncReq getparams:" + httpGet.getURI()); + logger.warn("exeAsyncReq getparams:" + httpGet.getURI()); hc.execute(httpGet, callback); @@ -590,7 +265,7 @@ public class HttpClientUtil { */ public static void httpAsyncGet(String baseUrl, List urlParams, FutureCallback callback) throws Exception { if (baseUrl == null || "".equals(baseUrl)) { - LOG.warn("we don't have base url, check config"); + logger.warn("we don't have base url, check config"); throw new Exception("missing base url"); } @@ -613,7 +288,7 @@ public class HttpClientUtil { + "?" + getUrl)); } - LOG.warn("exeAsyncReq getparams:" + httpGet.getURI()); + logger.warn("exeAsyncReq getparams:" + httpGet.getURI()); hc.execute(httpGet, callback); @@ -624,126 +299,6 @@ public class HttpClientUtil { } - public static String OkSyncPost(String url, String json) throws IOException { - - OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient(); - - RequestBody body = RequestBody.create(JSON, json); - Request request = new Request.Builder() - .url(url) - .post(body) - .build(); - try (Response response = okClient.newCall(request).execute()) { - - return response.body().string(); - } - } - - public static void OkAsyncPost(String url, String json) throws IOException { - OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient(); - - RequestBody body = RequestBody.create(JSON, json); - Request request = new Request.Builder() - .url(url) - .post(body) - .build(); - Call call = okClient.newCall(request); - call.enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - e.printStackTrace(); - } - - @Override - public void onResponse(Call call, Response response) throws IOException { - - LOG.warn("OkAsyncPost回调:" + response.body().string()); - } - }); - - } - - - public static void OkAsyncPost(String url, Map map) throws IOException { - OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient(); - - FormBody.Builder formBodyBuilder = new FormBody.Builder(); - for (Map.Entry entry : map.entrySet()) { - formBodyBuilder.add(entry.getKey(), entry.getValue()); - } - Request request = new Request.Builder() - .url(url) - .post(formBodyBuilder.build()) - .build(); - Call call = okClient.newCall(request); - call.enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - e.printStackTrace(); - } - - @Override - public void onResponse(Call call, Response response) throws IOException { - - LOG.warn("OkAsyncPost回调:" + response.body().string()); - } - }); - - } - - public static void OkAsyncPost(String url, Map map, Callback callback) throws IOException { - OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient(); - - FormBody.Builder formBodyBuilder = new FormBody.Builder(); - for (Map.Entry entry : map.entrySet()) { - formBodyBuilder.add(entry.getKey(), entry.getValue()); - } - - Request request = new Request.Builder() - .url(url) - .post(formBodyBuilder.build()) - .build(); - Call call = okClient.newCall(request); - call.enqueue(callback); - - } - - public static String OkSyncGet(String url) throws IOException { - - OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient(); - - Request request = new Request.Builder() - .url(url) - .build(); - try (Response response = okClient.newCall(request).execute()) { - - return response.body().string(); - } - } - - public static void OkAsyncGet(String url) throws IOException { - - OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient(); - - Request request = new Request.Builder() - .url(url) - .build(); - Call call = okClient.newCall(request); - call.enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - e.printStackTrace(); - } - - @Override - public void onResponse(Call call, Response response) throws IOException { - - LOG.warn("OkAsyncGet回调:" + response.body().string()); - } - }); - } - - /** * 获取单个文件并传递单个文件时使用 * @@ -759,15 +314,17 @@ public class HttpClientUtil { httpAsyncGetFile(getFileUrl, callback); // LOG.info(Thread.currentThread().getName() + "===>run====>>>" + "success,now time is" + System.currentTimeMillis()); } catch (Exception e) { - LOG.error("getFileAndPostFile multithreading is error===>" + e); + logger.error("getFileAndPostFile multithreading is error===>" + e); } } }); } catch (Exception e) { - LOG.error("getFileAndPostFile method is error===>" + e); + logger.error("getFileAndPostFile method is error===>" + e); } } + + /** * 对消息数据进行批量组装 * @param contents @@ -815,90 +372,66 @@ public class HttpClientUtil { } - public static void asyncProducerAvroMessageToZX(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) throws IOException { + /** + * 用于测试总线的同步发送客户端接口,不作为生产环境使用 + * @param urlProducer + * @param topic + * @param results + * @param dataJson + * @param userAgent + * @param msgSessionCookie + * @param count + * @param postTime + * @throws IOException + */ + public static void syncProducerAvroMessageTest(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) throws IOException { HttpClientTest.producerAvroMessageToZX(urlProducer, topic, results, dataJson, userAgent, msgSessionCookie, count, postTime); } - public static void asyncProducerAvroMessageToZX_toBatch(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { - HttpPost httpPost = null; - urlProducer = urlProducer.trim(); - CloseableHttpAsyncClient httpClient = null; - try { - httpClient = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(); - httpClient.start(); - httpPost = new HttpPost(urlProducer); - httpPost.addHeader("User-Agent", userAgent); - httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); - httpPost.addHeader("Cookie", msgSessionCookie); - String md5Avro = MD5Utils.md5Encode(results); - httpPost.addHeader("Checksum", md5Avro); - logger.info("批量发送body Checksum MD5 为:" + md5Avro); - httpPost.addHeader("Content-Type", "binary/octet-stream"); - httpPost.addHeader("X-Tag", "getXTAG(dataJson, topic)"); - ByteArrayEntity payload = new ByteArrayEntity(results); - payload.setContentEncoding("utf-8"); - httpPost.setEntity(payload); - AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime); - httpClient.execute(httpPost, asyncPostMsgCallBack); - logger.info("当前Thread number ID :" + Thread.currentThread().getId()); - } catch (MalformedURLException e) { - //执行URL url = new URL()的异常 - e.printStackTrace(); - } catch (ClientProtocolException e) { - // 执行httpClient.execute(httpGet)的异常 - e.printStackTrace(); - } catch (IOException e) { - // 执行httpClient.execute(httpGet)的异常 - logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志 - e.printStackTrace(); - } catch (Exception e) { - //handle response here... try other servers - logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志 - e.printStackTrace(); - } - - } - - /** - * 批量生产消息-总线 + * 异步批量发送接口 + * @param batchProduceUrl + * @param topic + * @param results + * @param xTag + * @param userAgent + * @param msgSessionCookie + * @param count + * @param postTime */ - public static void asyncProducerAvroMessageToZX_bk(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { + public static void batchAsyncProduceMessage(String batchProduceUrl, String topic, + byte[] results, + String xTag, String userAgent, + String msgSessionCookie, int count, long postTime) { HttpPost httpPost = null; - urlProducer = urlProducer.trim(); + batchProduceUrl = batchProduceUrl.trim(); CloseableHttpAsyncClient httpClient = null; try { httpClient = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(); httpClient.start(); - httpPost = new HttpPost(urlProducer); + httpPost = new HttpPost(batchProduceUrl); httpPost.addHeader("User-Agent", userAgent); httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); httpPost.addHeader("Cookie", msgSessionCookie); - String md5Avro = MD5Utils.md5Encode(results); - httpPost.addHeader("Checksum", md5Avro); - logger.info("批量发送body Checksum MD5 为:" + md5Avro); + String checksum = MD5Utils.md5Encode(results); + httpPost.addHeader("Checksum", checksum); httpPost.addHeader("Content-Type", "binary/octet-stream"); - httpPost.addHeader("X-Tag", getXTAG(dataJson, topic)); + httpPost.addHeader("X-Tag", xTag); ByteArrayEntity payload = new ByteArrayEntity(results); payload.setContentEncoding("utf-8"); httpPost.setEntity(payload); - AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime); - httpClient.execute(httpPost, asyncPostMsgCallBack); - logger.info("当前Thread number ID :" + Thread.currentThread().getId()); + AsyncBatchMsgCallBack asyncBatchMsgCallBack = new AsyncBatchMsgCallBack(batchProduceUrl, topic, checksum, count, postTime); + httpClient.execute(httpPost, asyncBatchMsgCallBack); } catch (MalformedURLException e) { - //执行URL url = new URL()的异常 e.printStackTrace(); } catch (ClientProtocolException e) { - // 执行httpClient.execute(httpGet)的异常 e.printStackTrace(); } catch (IOException e) { - // 执行httpClient.execute(httpGet)的异常 - logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志 + logger.error("IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + batchProduceUrl + "<=="); e.printStackTrace(); } catch (Exception e) { - //handle response here... try other servers - logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志 + logger.error("Other Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + batchProduceUrl + "<=="); e.printStackTrace(); } @@ -909,16 +442,18 @@ public class HttpClientUtil { * 生产AVRO数据入ZX(单条)--数据不包含schema * 静态,适用于异步与多线程的版本 * - * @param urlProducer + * @param singleProduceUrl * @param topic * @param dataJson * @param userAgent * @param msgSessionCookie * @return */ - public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { + public static void singleAsyncProduceMessage(String singleProduceUrl, String topic, + String dataJson, String userAgent, + String msgSessionCookie, int count, long postTime) { HttpPost httpPost = null; - urlProducer = urlProducer.trim(); + singleProduceUrl = singleProduceUrl.trim(); byte[] resultArray = null; CloseableHttpAsyncClient httpClient = null; try { @@ -961,7 +496,7 @@ public class HttpClientUtil { resultArray = alreadyGetFileTagRecordSoOnlyGetMergeAllArray(topic, resultArray); } - httpPost = new HttpPost(urlProducer); + httpPost = new HttpPost(singleProduceUrl); // set header httpPost.addHeader("User-Agent", userAgent); @@ -974,16 +509,11 @@ public class HttpClientUtil { httpPost.addHeader("Cookie", msgSessionCookie);//不设置Cookie时,广东测试出现报错,打开广东的Cookie设置测试一下,经测试,可用 - try { - String md5Avro = MD5Utils.md5Encode(resultArray); - httpPost.addHeader("Checksum", md5Avro); - logger.info("请求端Checksum MD5 avro 加密为:" + md5Avro); - logger.debug("原始数据==>" + dataJson + "<==," + - "原始数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," + - "对应请求端Checksum MD5 avro 加密为:" + md5Avro);//20200521新增,自证清白 - } catch (Exception e) { - logger.error("MD5Utils.md5Encode Method is error===>>> " + e); - } + + String md5Avro = MD5Utils.md5Encode(resultArray); + httpPost.addHeader("Checksum", md5Avro); + + // httpPost.addHeader("Content-Type", "application/avro+json;charset=UTF-8"); httpPost.addHeader("Content-Type", "binary/octet-stream"); @@ -999,22 +529,16 @@ public class HttpClientUtil { ByteArrayEntity payload = new ByteArrayEntity(resultArray); payload.setContentEncoding("utf-8"); //payload.setContentType("text/xml; charset=UTF-8"); - // anti avro httpPost.setEntity(payload); - - - logger.info("最终加载内容字节数组长度: " + resultArray.length); -// logger.debug("封装数据==>" + dataJson + "<==最终加载内容字节数组长度: " + resultArray.length);//20200428进一步细化日志 - logger.debug("原始数据==>" + dataJson + "<==," + +// logger.debug("封装数据==>" + dataJson + "<==最终加载内容字节数组长度: " + resultArray.length);//20200428进一步细化日志 + logger.info("原始数据==>" + dataJson + "<==," + "原始数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," + "数据处理时间handleTime==>" + (System.currentTimeMillis() - postTime) + "<==," + "最终加载内容字节数组长度: " + resultArray.length + "," + - "最终加载内容字节数组:" + Arrays.toString(resultArray));//20200521进一步细化日志 - + "对应请求端Checksum MD5 avro 加密为:" + md5Avro);//20200521进一步细化日志 //执行请求 - AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime); + AsyncSingleMsgCallBack asyncPostMsgCallBack = new AsyncSingleMsgCallBack(singleProduceUrl, topic, dataJson, userAgent, msgSessionCookie, count, postTime); httpClient.execute(httpPost, asyncPostMsgCallBack); - } catch (MalformedURLException e) { //执行URL url = new URL()的异常 e.printStackTrace(); @@ -1023,11 +547,13 @@ public class HttpClientUtil { e.printStackTrace(); } catch (IOException e) { // 执行httpClient.execute(httpGet)的异常 - logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志 + logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志 e.printStackTrace(); } catch (Exception e) { //handle response here... try other servers - logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志 + logger.error("asyncProducerAvroToZX is Exception===>>>" + + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志 e.printStackTrace(); } } @@ -1049,8 +575,7 @@ public class HttpClientUtil { } } - private static HashMap schemaHashMap = new HashMap();//用于存放Schema - private static Logger logger = Logger.getLogger(HttpClientUtil.class); + /** * 获取数据中的日志标签并将所有相关数据字节数组化后拼接返回 diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java index 9414163..6e04098 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java @@ -52,7 +52,7 @@ public class AsyncGetMailFilesCallback implements FutureCallback { public AsyncGetMailFilesCallback(ConfigInfo configInfo, String getFileUrl, String sendMsg, AsyncPostMailFilesCallback asyncPostMailFilesCallback, int count, int urlCount, int dealUrlCount, LinkedList attachmentsUrl) { this.configInfo = configInfo; this.postFileUrl = configInfo.getPostFileUrl();//通过configInfo赋值 - this.postMsgUrl = configInfo.getPostMsgUrl();//通过configInfo赋值 + this.postMsgUrl = configInfo.getSingleProduceUrl();//通过configInfo赋值 this.getFileUrl = getFileUrl;//初次存储的是eml_file_url,后续每次存储的都是当次需要请求的文件路径 this.sendMsg = sendMsg; this.asyncPostMailFilesCallback = asyncPostMailFilesCallback; diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java index ccf0729..1148ce1 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java @@ -65,7 +65,7 @@ public class AsyncPostMailFilesCallback implements FutureCallback public AsyncPostMailFilesCallback(ConfigInfo configInfo, String sendMsg, int count, int isCount, int dealIsCount, LinkedList attachmentsIdList) { this.configInfo = configInfo; this.postFileUrl = configInfo.getPostFileUrl(); - this.postMsgUrl = configInfo.getPostMsgUrl(); + this.postMsgUrl = configInfo.getSingleProduceUrl(); this.sendMsg = sendMsg;//存放对应于url的数据 this.count = count;//初始为0 this.isCount = isCount;//初始总数为url总数 @@ -275,7 +275,7 @@ public class AsyncPostMailFilesCallback implements FutureCallback AvroMonitorTimerTask.msgReadyPostSum++;//多个文件对应一条消息 //开始推送消息进入总线 - ProResBody proResBody = dclAsyncPost.avroDataLoad(configInfo.getPostMsgUrl(), configInfo.getTopicName(), sendMsg, configInfo.getBatchSize(), configInfo.getUserAgent(), configInfo.getMsgSessionCookie()); + ProResBody proResBody = dclAsyncPost.avroDataLoad(configInfo.getSingleProduceUrl(), configInfo.getTopicName(), sendMsg, configInfo.getBatchSize(), configInfo.getUserAgent(), configInfo.getMsgSessionCookie()); LOG.info("Send message with many fileId to zx over,this responseBody is===>" + proResBody.toString()); } else if (dealIsCount < isCount) {//继续发送余下文件流获取id // 注意此处dealIsCount必然是>=1的,所以不需要考虑eml_file,因为eml_file必然已经完成,直接发送attachmentsResultIsList中的流获取id diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java new file mode 100644 index 0000000..43c8eab --- /dev/null +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java @@ -0,0 +1,206 @@ +package cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack; + +import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil; +import cn.ac.iie.cusflume.sink.YbHttpAvroSinkFile; +import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask; +import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.utils.HttpClientUtils; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Map; + +/** + * 被回调的对象,给异步的httpclient使用 + */ +public class AsyncBatchMsgCallBack implements FutureCallback { + private static Logger logger = Logger.getLogger(AsyncBatchMsgCallBack.class); + + private String batchProduceUrl; + private String topicName; + private String checksum; + private int count; + private long postTime; + + public AsyncBatchMsgCallBack(String batchProduceUrl, String topicName, + String checksum, int count, long postTime) { + this.batchProduceUrl = batchProduceUrl; + this.topicName = topicName; + this.count = count; + this.postTime = postTime; + this.checksum = checksum; + } + + + public String getBatchProduceUrl() { + return batchProduceUrl; + } + + public void setBatchProduceUrl(String batchProduceUrl) { + this.batchProduceUrl = batchProduceUrl; + } + + public String getChecksum() { + return checksum; + } + + public void setChecksum(String checksum) { + this.checksum = checksum; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public long getPostTime() { + return postTime; + } + + public void setPostTime(long postTime) { + this.postTime = postTime; + } + + /** + * 请求完成后调用该函数 + */ + @Override + public void completed(HttpResponse response) { + try { + int statusCode = response.getStatusLine().getStatusCode(); + + HttpEntity entity = response.getEntity(); + + String result = null; + if (entity != null) { + result = EntityUtils.toString(entity, "UTF-8"); + } + + /** + * 不直接进行对象转换,除非数据加载不成功 + */ + Map map = JSONObject.parseObject(result, Map.class); + + int resRedirBodyCode = 0; + + if (map != null) { + resRedirBodyCode = (int) map.get("code"); + } + + /** + * 20200818-接口细化响应码 + */ + if (statusCode == 200 && resRedirBodyCode == 200) { + logger.info( + "数据加载成功,返回码: " + statusCode + + "生产数据checksum==>" + checksum + "<==," + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + "数据加载成功,返回码: " + statusCode); + AvroMonitorTimerTask.addSuccessNum(count); + } else { + AvroMonitorTimerTask.addFailedNum(count); + switch (resRedirBodyCode) { + case 300: + logger.error("AsyncPostBatchMsgCallBack==>重定向响应体-redirect-ret==>" + result + + "总线批量发送URL==>" + batchProduceUrl + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + "<==,Status Code:" + statusCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); + + //若不包含对应字段,则不进行对象转换,减少报错 + if (result.contains("redirect")) { + ResRedirBody resRedirBody = JSONObject.parseObject(result, ResRedirBody.class); + String redirectUrl = resRedirBody.getData().getRedirect(); + if (StringUtils.isNotBlank(redirectUrl)) { + YbHttpAvroSinkFile.changeUrl(redirectUrl); + } + } else { + logger.error("AsyncPostBatchMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!"); + } + + break; + case 301: + logger.error("AsyncPostBatchMsgCallBack==>Status Code:" + statusCode + + "总线批量发送URL==>" + batchProduceUrl + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); + break; + case 410: + logger.error("AsyncPostBatchMsgCallBack==>Status Code:" + statusCode + + "总线批量发送URL==>" + batchProduceUrl + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); + YbHttpAvroSinkFile.updateCookie(); + break; + case 500: + logger.error("AsyncPostBatchMsgCallBack==>Status Code:" + statusCode + + "总线批量发送URL==>" + batchProduceUrl + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + ",resRedirBodyCode:500,处理请求过程出现系统错误."); + YbHttpAvroSinkFile.updateCookie(); + break; + default: + logger.error("AsyncPostBatchMsgCallBack==>数据加载失败,响应体:" + result + + "总线批量发送URL==>" + batchProduceUrl + + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + + "---Status Code:" + statusCode + "---resRedirBodyCode:" + resRedirBodyCode); + break; + } + } + + if (entity != null) { + try { + EntityUtils.consume(entity); + } catch (final IOException ex) { + logger.error("IOException : " + ex); + } + } + + } catch (Exception e) { + logger.error("AsyncPostBatchMsgCallBack Get response from ZX is error===>>>" + + e.getMessage()); + e.printStackTrace(); + } finally { + HttpClientUtils.closeQuietly(response); + } + } + + /** + * 请求取消后调用该函数 + */ + @Override + public void cancelled() { + logger.error("AsyncPostMagCallBack Request is cancelled"); + } + + /** + * 请求失败后调用该函数 + */ + @Override + public void failed(Exception e) { + logger.error("AsyncPostMagCallBack Request is Failed,This Failed data checksum is ==>"+ checksum); + } + +} + + + + + + diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java similarity index 75% rename from yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java rename to yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java index 810e668..2d7eb33 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java @@ -15,14 +15,13 @@ import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.Arrays; import java.util.Map; /** * 被回调的对象,给异步的httpclient使用 */ -public class AsyncPostMsgCallBack implements FutureCallback { - private static Logger logger = Logger.getLogger(AsyncPostMsgCallBack.class); +public class AsyncSingleMsgCallBack implements FutureCallback { + private static Logger logger = Logger.getLogger(AsyncSingleMsgCallBack.class); private String postMsgUrl; private String topicName; @@ -32,7 +31,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { private int count; private long postTime; - public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { + public AsyncSingleMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) { this.postMsgUrl = postMsgUrl; this.topicName = topicName; this.dataJson = dataJson; @@ -103,7 +102,6 @@ public class AsyncPostMsgCallBack implements FutureCallback { */ @Override public void completed(HttpResponse response) { -// ProResBody proResBody = null; try { int statusCode = response.getStatusLine().getStatusCode(); HttpEntity entity = response.getEntity(); @@ -121,11 +119,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { if (map != null) { resRedirBodyCode = (int) map.get("code"); } -// int resRedirBodyCode = resRedirBody.getCode(); - /* logger.debug("生产数据==>" + dataJson + "<==," + - "生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," + - "返回的生产原始响应体String数据为:" + ret);*/ /** * 20200818-接口细化响应码 */ @@ -137,12 +131,11 @@ public class AsyncPostMsgCallBack implements FutureCallback { "数据加载成功,返回码: " + statusCode); AvroMonitorTimerTask.addSuccessNum(count); - } else { AvroMonitorTimerTask.addFailedNum(count); switch (resRedirBodyCode) { case 300: - logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret + logger.error("AsyncSingleMsgCallBack==>重定向响应体-redirect-ret==>" + ret + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + "<==,Status Code:" + statusCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); //若不包含对应字段,则不进行对象转换,减少报错 @@ -153,33 +146,35 @@ public class AsyncPostMsgCallBack implements FutureCallback { YbHttpAvroSinkFile.changeUrl(redirectUrl); } } else { - logger.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!"); + logger.error("AsyncSingleMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!"); } - YbHttpAvroSinkFile.redirectContents.add(dataJson); + HttpClientUtil.getRetryQueue().put(dataJson); break; case 301: - logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode + logger.error("AsyncSingleMsgCallBack==>Status Code:" + statusCode + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); - YbHttpAvroSinkFile.redirectContents.add(dataJson); + HttpClientUtil.getRetryQueue().put(dataJson); break; case 410: - logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode + logger.error("AsyncSingleMsgCallBack==>Status Code:" + statusCode + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); YbHttpAvroSinkFile.updateCookie(); break; case 500: - logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode + logger.error("AsyncSingleMsgCallBack==>Status Code:" + statusCode + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + ",resRedirBodyCode:500,处理请求过程出现系统错误."); YbHttpAvroSinkFile.updateCookie(); break; default: - logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret + + logger.error("AsyncSingleMsgCallBack==>数据加载失败,响应体:" + ret + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," + "---Status Code:" + statusCode + "---resRedirBodyCode:" + resRedirBodyCode); + break; } } @@ -188,11 +183,12 @@ public class AsyncPostMsgCallBack implements FutureCallback { try { EntityUtils.consume(entity); } catch (final IOException ex) { + logger.error("IOException : " + ex); } } } catch (Exception e) { - logger.error("AsyncPostMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志 + logger.error("AsyncSingleMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志 e.printStackTrace(); } finally { HttpClientUtils.closeQuietly(response); @@ -204,7 +200,7 @@ public class AsyncPostMsgCallBack implements FutureCallback { */ @Override public void cancelled() { - logger.error("AsyncPostMagCallBack Request is cancelled"); + logger.error("AsyncSingleMsgCallBack Request is cancelled"); } /** @@ -212,15 +208,8 @@ public class AsyncPostMsgCallBack implements FutureCallback { */ @Override public void failed(Exception e) { - count++; - logger.info("AsyncPostMagCallBack Request is Failed,This Failed data is ==>" + dataJson + "<==,Retry count=" + count); - if (count > 1) { - - AvroMonitorTimerTask.addFailedNum(1); - logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString()); - } else { - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试 - } + logger.error("AsyncSingleMsgCallBack Request is Failed,This Failed data is ==>" + + dataJson ); } } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java deleted file mode 100644 index e550f3f..0000000 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java +++ /dev/null @@ -1,139 +0,0 @@ -package cn.ac.iie.cusflume.sink; - -import cn.ac.iie.cusflume.sink.CommonUtils.SinkHttpClientUtil; -import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask; -import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils; -import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody; -import com.alibaba.fastjson.JSONObject; -import org.apache.commons.lang.StringUtils; -import org.apache.http.HttpEntity; -import org.apache.http.HttpStatus; -import org.apache.http.ParseException; -import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.protocol.HTTP; -import org.apache.http.util.EntityUtils; -import org.apache.log4j.Logger; -import org.mortbay.log.Log; - -import java.io.IOException; - -public class SinkService { - private static final SinkService sinkService = new SinkService(); - - private SinkService() { - } - //SinkHttpClientUtil.getHttpClient(); - public static CloseableHttpClient httpClient = null; - private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class); - - public static SinkService getInstance() { - return sinkService; - } - - public void producerAvroMessageToBus( - String urlProducer, String topic, byte[] results, - String dataJson, String userAgent, String msgSessionCookie, int batchSize) { - - httpClient = SinkHttpClientUtil.getHttpClient(); - HttpPost httpPost = null; - urlProducer = urlProducer.trim(); - String msg = "-1"; - // 创建POST请求对象 - CloseableHttpResponse response = null; - try { - httpPost = new HttpPost(urlProducer); - httpPost.addHeader("User-Agent", userAgent); - httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); - httpPost.addHeader("Cookie", msgSessionCookie); - String md5Avro = MD5Utils.md5Encode(results); - httpPost.addHeader("Checksum", md5Avro); - LOG.info("批量发送body Checksum MD5 为:" + md5Avro); - httpPost.addHeader("Content-Type", "binary/octet-stream"); - httpPost.addHeader("X-Tag", cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil.getXTAG(dataJson, topic)); - ByteArrayEntity payload = new ByteArrayEntity(results); - payload.setContentEncoding("utf-8"); - httpPost.setEntity(payload); - long startTime = System.currentTimeMillis(); - response = httpClient.execute(httpPost); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime)); - if (statusCode != HttpStatus.SC_OK) { - AvroMonitorTimerTask.addSuccessNum(batchSize); - LOG.info("数据总线响应内容:" + msg); - } else { - AvroMonitorTimerTask.addFailedNum(batchSize); - LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg); - - switch (statusCode) { - case 300: - LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" - + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求."); - //若不包含对应字段,则不进行对象转换,减少报错 - if (msg.contains("redirect")) { - ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class); - String redirectUrl = resRedirBody.getData().getRedirect(); - if (StringUtils.isNotBlank(redirectUrl)) { - YbHttpAvroSinkFile.changeUrl(redirectUrl); - } - } else { - LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!"); - } - - - break; - case 301: - LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode + - ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待."); - break; - case 410: - LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode - + "服务端响应时间(ms)==>" + "<==," - + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie."); - YbHttpAvroSinkFile.updateCookie(); - break; - case 500: - LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode - + "服务端响应时间(ms)==>" + "<==," - + ",resRedirBodyCode:500,处理请求过程出现系统错误."); - YbHttpAvroSinkFile.updateCookie(); - break; - default: - LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==," - + "---Status Code:" + statusCode); - break; - } - - } - - - } catch (ClientProtocolException e) { - LOG.error("协议错误: {}", e); - } catch (ParseException e) { - LOG.error("解析错误: {}", e); - } catch (IOException e) { - LOG.error("IO错误: {}", e); - } catch (Exception e) { - LOG.error("其它错误: {}", e); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("释放链接错误: {}", e); - } - } - } - - } - -} - diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java index 2ca287e..63a6d72 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java @@ -16,20 +16,16 @@ import org.apache.flume.sink.AbstractSink; import org.apache.log4j.Logger; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { private static Logger logger = Logger.getLogger(YbHttpAvroSinkFile.class); - protected static ExecutorService pool = Executors.newFixedThreadPool(RealtimeCountConfig.HTTP_ASYNC_PARALLELISM); - private static ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(); private static DataCenterLoad dcl; - private static String postMsgUrl;//发送消息路径,配置文件获取,发送文件与发送消息皆需要 + private static String singleProduceUrl;//单条发送接口 + private static String batchProduceUrl; //批量发送接口 private String postFileUrl;//发送文件路径,配置文件获取,仅发送文件时需要---若只发送消息,则此路径与postMsgUrl设置相同即可 @@ -61,14 +57,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { private static boolean checkTimerStart = false;//定时获取Cookie启动器 - private static boolean redirectContentsPostStart = false;//定时post重定向数据集合 - private static int batchInsertNum = RealtimeCountConfig.BATCH_INSERT_NUM; - /** - * 用于存储由于服务器资源不足所造成的未发送数据 - */ - public static List redirectContents; + /** * 用于存放验证以及连接的url的各组成部分,方便调取 @@ -84,35 +75,45 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { super.start(); dcl = new DataCenterLoad(); - redirectContents = new ArrayList<>();//初始化 /** * 拆解初始化获取的url后缀,用于填充urlToolHm,用于后续动态负载均衡中的url变更 */ - if (StringUtils.isNotBlank(checkMsgUrl) && StringUtils.isNotBlank(postMsgUrl)) { + if (StringUtils.isNotBlank(checkMsgUrl) + && StringUtils.isNotBlank(singleProduceUrl) + && StringUtils.isNotBlank(batchProduceUrl)) { urlToolHm = new HashMap<>(); makeUrlSplitMap(checkMsgUrl, "check"); - makeUrlSplitMap(postMsgUrl, "post"); + makeUrlSplitMap(singleProduceUrl, "post_single"); + makeUrlSplitMap(batchProduceUrl, "post_batch"); + } else { logger.error("Starting YbHttpAvroSinkFile is error==>checkMsgUrl and postMsgUrl can not be null!!!!"); } - logger.warn("启动Sink File 执行程序 =============="); - //new Thread(new Consumer()).start(); - logger.warn("开启多线程消费队列数据=================="); - + new Thread(new RetrySendMessage()).start(); logger.warn("Starting YbHttpAvroSinkFile ... ..."); } @Override public void configure(Context context) { try { - postMsgUrl = context.getString("postMsgUrl", ""); - Preconditions.checkNotNull("".equals(postMsgUrl), "postMsgUrl must be set!!"); - logger.info("Read Post Message URL from configuration : " + postMsgUrl); + singleProduceUrl = context.getString("singleProduceUrl", ""); + Preconditions.checkNotNull("".equals(singleProduceUrl), "singleProduceUrl must be set!!"); + logger.info("Read Post Message URL from configuration : " + singleProduceUrl); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Endpoint Message URL invalid", e); } catch (Exception e) { - logger.error("Get postMsgUrl is error : " + e); + logger.error("Get singleProduceUrl is error : " + e); + } + + try { + batchProduceUrl = context.getString("batchProduceUrl", ""); + Preconditions.checkNotNull("".equals(batchProduceUrl), "batchProduceUrl must be set!!"); + logger.info("Read Post Message URL from configuration : " + batchProduceUrl); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Endpoint Message URL invalid", e); + } catch (Exception e) { + logger.error("Get batchProduceUrl is error : " + e); } try { @@ -178,7 +179,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); - Transaction transaction = null; + Transaction transaction = null; try { logger.debug("Current Process Thread number ID :" + Thread.currentThread().getId()); transaction = channel.getTransaction(); @@ -200,26 +201,23 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { switch (topicName) { //作为单条发送-新分类-20191219 /** - * 非文件消息 + * 非文件消息:NTC-CONN-RECORD-LOG,NTC-COLLECT-DNS-LOG,NTC-COLLECT-SSL-LOG + * 文件类消息:NTC-COLLECT-FILE-LOG(有独立文件标签)、 + * NTC-COLLECT-HTTP-DOC-LOG、NTC-COLLECT-HTTP-AV-LOG、NTC-COLLECT-FTP-DOC-LOG、 + * NTC-COLLECT-MAIL-LOG、NTC-COLLECT-TELNET-LOG + * 状态消息:INFLUX-SAPP-BPS-STAT-LOG(读取回写的influxDB合计数据用作状态上传) */ case "NTC-CONN-RECORD-LOG": case "NTC-COLLECT-DNS-LOG": case "NTC-COLLECT-SSL-LOG": - /** - * 文件消息 - */ - case "NTC-COLLECT-FILE-LOG"://发送独立出来的文件标签 + case "NTC-COLLECT-FILE-LOG": case "NTC-COLLECT-HTTP-DOC-LOG": - case "NTC-COLLECT-HTTP-AV-LOG"://schema等同于NTC-COLLECT-HTTP-DOC-LOG + case "NTC-COLLECT-HTTP-AV-LOG": case "NTC-COLLECT-FTP-DOC-LOG": case "NTC-COLLECT-MAIL-LOG": case "NTC-COLLECT-TELNET-LOG": - /** - * 状态消息 - */ - case "INFLUX-SAPP-BPS-STAT-LOG"://读取回写的influxDB合计数据用作状态上传 - sendMsgLog(transaction, contents);//20191209移除文件发送,仅处理消息 - //sendMsgController(transaction, contents); + case "INFLUX-SAPP-BPS-STAT-LOG": + sendMsgLog(transaction, contents); break; default: logger.error("YbHttpAvroSinkFile can't find this topic:" + topicName + ".Please confirm this topicName is correct!!!"); @@ -241,7 +239,6 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } finally { if (transaction != null) { transaction.close(); - logger.debug("close Transaction"); } } return result; @@ -260,7 +257,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } if (statusCode == 200) { if (StringUtils.isNotBlank(acCheckMsgResBody.getSessionId())) { - logger.warn("AC msg successfully,msg sessionId is ===>" + acCheckMsgResBody.getSessionId()); + logger.info("AC msg successfully,msg sessionId is ===>" + acCheckMsgResBody.getSessionId()); msgSessionCookie = acCheckMsgResBody.getSessionId(); } } else if (statusCode == 0) { @@ -269,8 +266,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { logger.error("AC msg from ZX is error,statusCode is " + statusCode + "(case)=" + acCheckMsgResBody.getCode() + "(getMethod)<==="); logger.error("This " + statusCode + " ResponseBody(contain sessionId) is ===>" + acCheckMsgResBody.toString() + "<==="); } - updateConfigInfo();//getMsgSessionCookie()更新 -// return producerResBody; + updateConfigInfo(); } /** @@ -301,7 +297,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { private static ConfigInfo updateConfigInfo() { configInfo.setCheckMsgUrl(checkMsgUrl); - configInfo.setPostMsgUrl(postMsgUrl); + configInfo.setSingleProduceUrl(singleProduceUrl); + configInfo.setBatchProduceUrl(batchProduceUrl); + configInfo.setMsgSessionCookie(msgSessionCookie); configInfo.setMonitorSessionCookie(monitorSessionCookie);//缓存monitorSessionCookie @@ -314,147 +312,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { return configInfo; } - - /** - * 发送消息控制器 - * @param transaction - * @param contents - */ - private void sendMsgController(Transaction transaction, List contents) { - try { - AvroMonitorTimerTask.addTotalNum(contents.size()); - AvroMonitorTimerTask.addReadyPostNum(contents.size()); - /** - * 获取状态回传sessionID,检查认证是否存在 - */ - if (StringUtils.isBlank(monitorSessionCookie) - || StringUtils.isBlank(msgSessionCookie)) { - getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie - getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证 - if (!checkTimerStart) { - checkCookieEveryWeek();//sendMsgLog-第一次启动检测到monitorSessionCookie为空时启动任务但不进行验证,后续间隔一段时间后开始验证,每次申请monitorSessionCookie和msgSessionCookie两个Cookie - checkTimerStart = true; - logger.warn("CheckMsgAndFileCookie Timer is started......"); - } - - if (!monitorStart) {//消息定时上报 - AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, postMsgUrl, - "monitor-msg", 1, userAgent, topicName);//sendMsgLog-日志消息 - monitorStart = true; - logger.warn("MonitorMsg Timer is started......"); - } - - - if (!redirectContentsPostStart) { - postRedirectDataEveryMin(); - redirectContentsPostStart = true; - logger.warn("RedirectContents Timer Post is started......"); - } - } - - - if (isSingle(topicName)) { - - for (String content : contents) { - pool.execute(new Runnable() { - @Override - public void run() { - try { - - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, - userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0 - } catch (Exception e) { - logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); - } - } - }); - } - - } else { - xTag = HttpClientUtil.getXTAG(contents.get(0), topicName); - dataJson = contents.get(0); - int size = contents.size() / 100; - if (size > 0) { - for (int i = 0; i < size; i++) { - // byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents.subList(i*1, (i+1)*1)); - //concurrentLinkedQueue.add(msgResults); - pool.execute(new Producer(contents.subList(i * 100, (i + 1) * 100))); - } - if (contents.size() % 100 > 0) { - pool.execute(new Producer(contents.subList(size, contents.size()))); - } - } else { - pool.execute(new Producer(contents)); - } - } - - } catch (Exception e) { - logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); - transaction.commit(); - } finally { - if (transaction != null) { - transaction.commit(); - } - AvroMonitorTimerTask.subReadyPostNum(contents.size()); - } - - - - - - } - - - - - class Producer implements Runnable { - - private List contents; - public Producer(List contents) { ; - this.contents = contents; - } - @Override - public void run() { - try { - logger.debug("Current Producer Thread number ID :" + Thread.currentThread().getId()); - byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); - concurrentLinkedQueue.add(msgResults); - - } catch (Exception e) { - logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); - - } - - - } - } - - - class Consumer implements Runnable { - - @Override - public void run() { - logger.debug("Current Consumer Thread number ID :" + Thread.currentThread().getId()); - while(true) { - - if (concurrentLinkedQueue.isEmpty()) { - logger.info("当前队列无数据,等待数据接入!"); - } else { - byte[] result = concurrentLinkedQueue.poll(); - HttpClientUtil.asyncProducerAvroMessageToZX_toBatch(postMsgUrl, topicName, result, dataJson, - userAgent, msgSessionCookie, 100, System.currentTimeMillis());//初始发送count计数为0 - logger.info("生产数据,等待数据接入!"); - } - } - } - } - - - /** * 往zx发送文件数据的消息,即发送文件的message数据(结构化数据) * 本来是作为文件消息发送,现该方法主要用于单条发送数据-20191224 - * * @param transaction * @param contents */ @@ -462,7 +322,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { try { - logger.info("Current sendMsgLog Thread number ID :" + Thread.currentThread().getId()); + logger.debug("Current sendMsgLog Thread number ID :" + Thread.currentThread().getId()); /** * 获取状态回传sessionID,检查认证是否存在 @@ -478,33 +338,25 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } if (!monitorStart) {//消息定时上报 - AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, postMsgUrl, + AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, singleProduceUrl, "monitor-msg", 1, userAgent, topicName);//sendMsgLog-日志消息 monitorStart = true; logger.debug("MonitorMsg Timer is started......"); } - if (!redirectContentsPostStart) { - postRedirectDataEveryMin(); - redirectContentsPostStart = true; - logger.debug("RedirectContents Timer Post is started......"); - } + logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); } - AvroMonitorTimerTask.addTotalNum(contents.size()); - logger.debug("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie); AvroMonitorTimerTask.addReadyPostNum(contents.size()); if (isSingle(topicName)) { for (String content : contents) { - pool.execute(new Runnable() { @Override public void run() { try { - - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, + HttpClientUtil.singleAsyncProduceMessage(singleProduceUrl, topicName, content, userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0 } catch (Exception e) { logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); @@ -514,61 +366,30 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { } } else { long beginAvroTime = System.currentTimeMillis(); + xTag = HttpClientUtil.getXTAG(contents.get(0), topicName); byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents); - logger.debug("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) ); - HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0), + + logger.info("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) ); + HttpClientUtil.batchAsyncProduceMessage(batchProduceUrl, topicName, msgResults, xTag, userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis()); - /* for (String content : contents) { - HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0), - userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0 - }*/ - - - /*for (String content : contents) { - - pool.execute(new Runnable() { - @Override - public void run() { - try { - - HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0), - userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis()); - } catch (Exception e) { - logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); - } - } - }); - }*/ - - /* for (String content : contents) { - - pool.execute(new Runnable() { - @Override - public void run() { - try { - SinkService.getInstance().producerAvroMessageToBus(postMsgUrl, topicName, msgResults, contents.get(0), - userAgent, msgSessionCookie, contents.size()); - } catch (Exception e) { - logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==."); - } - } - }); - }*/ - } } catch (Exception e) { - logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<==="); + logger.error("YbHttpAvroSinkFile Send Msg is error===>" + e + "<==="); transaction.commit(); - } finally { - if (transaction != null) { - transaction.commit(); - } - AvroMonitorTimerTask.subReadyPostNum(contents.size()); - } + } finally { + if (transaction != null) { + transaction.commit(); + } + AvroMonitorTimerTask.subReadyPostNum(contents.size()); + } } - + /** + * 单条消息发送处理逻辑 + * @param topicName + * @return + */ private static boolean isSingle(String topicName) { if (topicName.equals("NTC-COLLECT-FILE-LOG") @@ -619,31 +440,25 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { }, 1000 * 60 * 60 * 24 * 7, 1000 * 60 * 60 * 24 * 7);//每隔7天执行一次 } - /** - * 重定向数据集上传定时器,每隔一段时间扫描并上传一次-用于上传因为服务器资源紧张暂未上传的数据 - */ - private void postRedirectDataEveryMin() { - Timer timer = new Timer(); - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try { - if (redirectContents.size() > 0) { - List tmpListFreq = new ArrayList<>(redirectContents); - redirectContents.clear(); - AvroMonitorTimerTask.msgReadyPostSum += tmpListFreq.size(); - for (String content : tmpListFreq) { - HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0 - } - logger.info("PostRedirectDataEveryMin post to zx RedirectData size==>" + tmpListFreq.size() + "<==."); - } - } catch (Exception e) { - logger.error("PostRedirectDataEveryMin to zx everyMin is error===>>>" + e + "<==="); + + class RetrySendMessage implements Runnable { + + @Override + public void run() { + logger.debug("Current Consumer Thread number ID :" + Thread.currentThread().getId()); + while(true) { + if (!HttpClientUtil.getRetryQueue().isEmpty()) { + logger.info("当前队列大小:" + HttpClientUtil.getRetryQueue().size()); + HttpClientUtil.singleAsyncProduceMessage(singleProduceUrl, topicName, + HttpClientUtil.getRetryQueue().poll(), userAgent, msgSessionCookie, 1, System.currentTimeMillis()); + } } - }, 1000 * 60, 1000 * 60);//每隔1分钟执行一次 + } } + + /** * 动态负载均衡变更cookie-20200818 * @@ -654,7 +469,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { /** * 变更postMsgUrl与checkMsgUrl */ - postMsgUrl = redirectUrlPort + urlToolHm.get("post_suf_path"); + singleProduceUrl = redirectUrlPort + urlToolHm.get("post_single_suf_path"); + batchProduceUrl = redirectUrlPort + urlToolHm.get("post_batch_suf_path"); checkMsgUrl = redirectUrlPort + urlToolHm.get("check_suf_path"); /** @@ -662,9 +478,14 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { */ updateCookie(); - logger.info("YbHttpAvroSinkFile->changeUrl->change postMsgUrl:" + postMsgUrl + ",change checkMsgUrl:" + checkMsgUrl); + logger.warn("YbHttpAvroSinkFile->changeUrl->" + + "change singleProduceUrl:" + singleProduceUrl + + "change batchProduceUrl:" + batchProduceUrl + + ",change checkMsgUrl:" + checkMsgUrl); } + + /** * 动态负载均衡更新cookie */ @@ -672,7 +493,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { getMonitorSessionCookie();//动态负载均衡修改url,重新获取cookie getMsgSessionCookie();//动态负载均衡修改url,重新获取cookie - logger.info("YbHttpAvroSinkFile->updateCookie update cookie,postMsgUrl:" + postMsgUrl + logger.warn("YbHttpAvroSinkFile->updateCookie update cookie,singleProduceUrl:" + singleProduceUrl + + "batchProduceUrl:" + batchProduceUrl + ",checkMsgUrl:" + checkMsgUrl + ",获取monitorSessionCookie:" + monitorSessionCookie + ",获取msgSessionCookie:" + msgSessionCookie); @@ -685,7 +507,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable { * @param urlType */ private static void makeUrlSplitMap(String oldUrlPath, String urlType) { - String[] split = oldUrlPath.replace("http://", "").replace("https://", "").split("/", 2); + String[] split = oldUrlPath.replace("http://", "") + .replace("https://", "").split("/", 2); if (split.length == 2) { /* diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java index d62c81a..7d5ae66 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java @@ -88,7 +88,7 @@ public class AvroMonitorTimerTask { if ("yb".equals(RealtimeCountConfig.MONITOR_TYPE)) {//只有当类型为一部(yb)时才进行状态上报 String sendMsg = getJson(RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE, RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE_FLUME, topicType);//新版-20200428 logger.info("Send monitor message is===>>>" + sendMsg + "<<<==="); - HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0 + HttpClientUtil.singleAsyncProduceMessage(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0 } } catch (Exception e) { logger.error("Send monitorMsg to zx is error===>>>" + e + "<==="); diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java index e479b4d..2c1372e 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java @@ -32,6 +32,7 @@ import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.*; @@ -41,6 +42,8 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.LaxRedirectStrategy; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; @@ -171,6 +174,21 @@ public class HttpManager { // 声明重定向策略对象 LaxRedirectStrategy redirectStrategy = new LaxRedirectStrategy(); + + ConnectionKeepAliveStrategy myStrategy = (response, context) -> { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && param.equalsIgnoreCase("timeout")) { + return Long.parseLong(value) * 1000; + } + } + return 10 * 1000;//如果没有约定,则默认定义时长为60s + }; + /** * 原版 */ @@ -179,6 +197,7 @@ public class HttpManager { .setDefaultRequestConfig(requestConfig) .setRedirectStrategy(redirectStrategy) .setRetryHandler(myRetryHandler) + .setKeepAliveStrategy(myStrategy) .build(); return httpClient; @@ -629,6 +648,7 @@ public class HttpManager { httpPost.addHeader("User-Agent", userAgent); httpPost.addHeader("X-Tag", xTag);//根据最新文档,目前已经不需要此头-20191217 httpPost.addHeader("Content-Type", "application/json"); + httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); StringEntity payload = new StringEntity(data, Charset.forName("utf-8")); //payload.setContentType("text/xml; charset=UTF-8"); payload.setContentEncoding("utf-8"); @@ -671,14 +691,15 @@ public class HttpManager { // 执行httpClient.execute(httpGet)的异常 e.printStackTrace(); } finally { - if (response != null) { + if (null != response) { try { + EntityUtils.consumeQuietly(response.getEntity()); response.close(); } catch (IOException e) { - e.printStackTrace(); + logger.error("释放链接错误: " + e.getMessage()); } } - httpPost.abort(); + //httpPost.abort(); } return acResBody; } diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java index cd23d28..4da6319 100644 --- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java +++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java @@ -9,7 +9,8 @@ package cn.ac.iie.cusflume.sink.bean.configBean; */ public class ConfigInfo { - private String postMsgUrl; + private String singleProduceUrl; + private String batchProduceUrl; private String postFileUrl; private String checkMsgUrl; @@ -26,12 +27,20 @@ public class ConfigInfo { private String xTag; private int batchSize; - public String getPostMsgUrl() { - return postMsgUrl; + public String getSingleProduceUrl() { + return singleProduceUrl; } - public void setPostMsgUrl(String postMsgUrl) { - this.postMsgUrl = postMsgUrl; + public void setSingleProduceUrl(String singleProduceUrl) { + this.singleProduceUrl = singleProduceUrl; + } + + public String getBatchProduceUrl() { + return batchProduceUrl; + } + + public void setBatchProduceUrl(String batchProduceUrl) { + this.batchProduceUrl = batchProduceUrl; } public String getPostFileUrl() { @@ -126,7 +135,8 @@ public class ConfigInfo { @Override public String toString() { return "ConfigInfo{" + - "postMsgUrl='" + postMsgUrl + '\'' + + ", singleProduceUrl='" + singleProduceUrl + '\'' + + ", batchProduceUrl='" + batchProduceUrl + '\'' + ", postFileUrl='" + postFileUrl + '\'' + ", checkMsgUrl='" + checkMsgUrl + '\'' + ", checkFileUrl='" + checkFileUrl + '\'' + diff --git a/yb_http_avro_sink_file/src/main/resources/flume_config.properties b/yb_http_avro_sink_file/src/main/resources/flume_config.properties index a1bc883..4530cc2 100644 --- a/yb_http_avro_sink_file/src/main/resources/flume_config.properties +++ b/yb_http_avro_sink_file/src/main/resources/flume_config.properties @@ -2,19 +2,19 @@ http.async.parallelism=10 #异步Http客户端-等待数据超时时间,根据业务调整 -http.async.socketTimeout=10000 +http.async.socketTimeout=60000 #异步Http客户端-连接超时时间 -http.async.connectTimeout=10000 +http.async.connectTimeout=30000 #异步Http客户端-连接池最大连接数 -http.async.poolSize=80 +http.async.poolSize=4 #异步Http客户端-每个主机的并发最多只有1500 -http.async.maxPerRoute=80 +http.async.maxPerRoute=4 #异步Http客户端-从连接池中后去连接的timeout时间 -http.async.connectionRequestTimeout=15000 +http.async.connectionRequestTimeout=30000 #Schema配置信息