diff --git a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java index fb64b81..28e113d 100644 --- a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java +++ b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java @@ -42,7 +42,7 @@ public class ReadHistoricalDruidData implements Callable ATTACK_TYPE_LIST = Arrays.asList( ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD // ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, @@ -36,6 +35,7 @@ public class BaselineGeneration { private static final Tuple2 START_END_TIMES = DruidData.getTimeLimit(); private static final Map>> allFromDruid = new HashMap<>(); + private static int threadNum = ApplicationConfig.THREAD_MAX_NUM; /** * 程序执行 @@ -44,41 +44,39 @@ public class BaselineGeneration { long start = System.currentTimeMillis(); try{ - // baseline生成并写入 - generateBaselinesThread(); - - long last = System.currentTimeMillis(); - LOG.warn("运行时间:" + (last - start)); - + loadFromDruid(); + baselineGenration(); hbaseTable.close(); - LOG.info("Druid 关闭连接"); - } catch (Exception e){ e.printStackTrace(); + } finally { + long last = System.currentTimeMillis(); + LOG.warn("运行时间:" + (last - start)); } System.exit(0); } /** - * 多线程baseline生成入口 + * Druid数据读取 * @throws InterruptedException */ - private void generateBaselinesThread() throws InterruptedException { - int threadNum = ApplicationConfig.THREAD_MAX_NUM; - - // 数据读取 - LOG.info("Druid 开始读取数据"); + private void loadFromDruid() throws InterruptedException { + LOG.info("开始读取数据"); long start = System.currentTimeMillis(); + + long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; + int threadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad); + ArrayList>>>> resultList = new ArrayList<>(); + CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum); + ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-load-data-%d").build(); ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor( threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; - int threadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad); - ArrayList>>>> resultList = new ArrayList<>(); - CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum); + + // 按ip数分区 for (int i = 0; i < threadPoolNum; i++) { String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad); ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( @@ -91,6 +89,7 @@ public class BaselineGeneration { loadDataExecutor.shutdown(); loadDataCountDownLatch.await(); + // 返回结果合并 for(Future>>> future: resultList){ try { Map>> queryBatchIpData = future.get(); @@ -104,10 +103,18 @@ public class BaselineGeneration { e.printStackTrace(); } } + LOG.info("本次共查询到服务端ip个数:" +allFromDruid.size()); + LOG.info("查询范围: " + START_END_TIMES._1 + " - " + START_END_TIMES._2); + long last = System.currentTimeMillis(); LOG.info("Druid 加载数据共耗时:"+(last-start)); + } - // BaseLine生成 + /** + * Baseline生成及写入 + * @throws InterruptedException + */ + private static void baselineGenration() throws InterruptedException { ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-generate-%d").build(); ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( @@ -115,10 +122,8 @@ public class BaselineGeneration { TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - LOG.info("共查询到服务端ip " +allFromDruid.size() + " 个"); LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - List>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); CountDownLatch generateCountDownLatch = new CountDownLatch(batchDruidDataLists.size()); for (Map>>batchDruidData: batchDruidDataLists){ diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index a1cdfc2..9863770 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -63,12 +63,11 @@ public class BaselineSingleThread extends Thread { } try { hbaseTable.put(putList); - LOG.info(" 成功写入Baseline条数共计 " + putList.size()); } catch (IOException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); - LOG.info("本线程处理完毕,剩余线程数量:" + countDownLatch.getCount()); + LOG.info("成功写入Baseline条数共计 " + putList.size() + " 剩余线程数量:" + countDownLatch.getCount()); } } diff --git a/src/main/java/cn/mesalab/utils/ExecutorThreadPool.java b/src/main/java/cn/mesalab/utils/ExecutorThreadPool.java new file mode 100644 index 0000000..bb0e8f5 --- /dev/null +++ b/src/main/java/cn/mesalab/utils/ExecutorThreadPool.java @@ -0,0 +1,67 @@ +package cn.mesalab.utils; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.omg.PortableInterceptor.INACTIVE; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; + +public class ExecutorThreadPool { + private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadPool.class); + private static ExecutorService pool = null ; + private static ExecutorThreadPool poolExecutor = null; + private int threadPoolNum; + + static { + getThreadPool(); + } + + public ExecutorThreadPool(Integer threadPoolNum){ + this.threadPoolNum. + } + + private static void getThreadPool(){ + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("iplearning-application-pool-%d").build(); + pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER, + 0L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + } + + public static ExecutorThreadPool getInstance(){ + if (null == poolExecutor){ + poolExecutor = new ExecutorThreadPool(); + } + return poolExecutor; + } + + public void executor(Runnable command){ + pool.execute(command); + } + + @Deprecated + public void awaitThreadTask(){ + try { + while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { + LOG.warn("线程池没有关闭"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void shutdown(){ + pool.shutdown(); + } + + @Deprecated + public static Long getThreadNumber(){ + String name = Thread.currentThread().getName(); + String[] split = name.split("-"); + return Long.parseLong(split[3]); + } + + + +} diff --git a/src/main/java/cn/mesalab/utils/HttpClientUtils.java b/src/main/java/cn/mesalab/utils/HttpClientUtils.java deleted file mode 100644 index b5b7382..0000000 --- a/src/main/java/cn/mesalab/utils/HttpClientUtils.java +++ /dev/null @@ -1,485 +0,0 @@ -package cn.mesalab.utils; - -import cn.mesalab.config.ApplicationConfig; -import com.google.common.collect.Maps; -import org.apache.http.*; -import com.zdjizhi.utils.StringUtil; -import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.HttpRequestRetryHandler; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.conn.ConnectionKeepAliveStrategy; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.message.BasicHeaderElementIterator; -import org.apache.http.protocol.HTTP; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.PostConstruct; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLHandshakeException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; - -/** - * @author yjy - * @version 1.0 - * @date 2021/8/3 3:57 下午 - */ - -public class HttpClientUtils { - private static final Logger LOG = LoggerFactory.getLogger(HttpClientUtils.class); - - //全局连接池对象 - private PoolingHttpClientConnectionManager connectionManager; - - /** - * 初始化连接池信息 - */ - @PostConstruct - public void initConnectionManager() { - if (connectionManager == null) { - connectionManager = new PoolingHttpClientConnectionManager(); - // 整个连接池最大连接数 - connectionManager.setMaxTotal(ApplicationConfig.HTTP_MAX_CONNECTION_NUM); - // 每路由最大连接数,默认值是2 - connectionManager.setDefaultMaxPerRoute(ApplicationConfig.HTTP_MAX_PER_ROUTE); - } - LOG.info("Initializing PoolingHttpClientConnectionManager Complete"); - } - - /** - * 获取Http客户端连接对象 - * - * @param socketTimeOut 响应超时时间 - * @return Http客户端连接对象 - */ - public CloseableHttpClient getHttpClient(int socketTimeOut) { - // 创建Http请求配置参数 - RequestConfig requestConfig = RequestConfig.custom() - // 获取连接超时时间 - .setConnectionRequestTimeout(ApplicationConfig.HTTP_CONNECTION_TIMEOUT) - // 请求超时时间 - .setConnectTimeout(ApplicationConfig.HTTP_REQUEST_TIMEOUT) - // 响应超时时间 - .setSocketTimeout(socketTimeOut) - .build(); - - /** - * 测出超时重试机制为了防止超时不生效而设置 - * 如果直接放回false,不重试 - * 这里会根据情况进行判断是否重试 - */ - HttpRequestRetryHandler retry = (exception, executionCount, context) -> { - if (executionCount >= 3) {// 如果已经重试了3次,就放弃 - return false; - } - if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 - return true; - } - if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 - return false; - } - if (exception instanceof InterruptedIOException) {// 超时 - return true; - } - if (exception instanceof UnknownHostException) {// 目标服务器不可达 - return false; - } - if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 - return false; - } - if (exception instanceof SSLException) {// ssl握手异常 - return false; - } - HttpClientContext clientContext = HttpClientContext.adapt(context); - HttpRequest request = clientContext.getRequest(); - // 如果请求是幂等的,就再次尝试 - if (!(request instanceof HttpEntityEnclosingRequest)) { - return true; - } - return false; - }; - - - ConnectionKeepAliveStrategy myStrategy = (response, context) -> { - HeaderElementIterator it = new BasicHeaderElementIterator - (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); - while (it.hasNext()) { - HeaderElement he = it.nextElement(); - String param = he.getName(); - String value = he.getValue(); - if (value != null && param.equalsIgnoreCase("timeout")) { - return Long.parseLong(value) * 1000; - } - } - return 60 * 1000;//如果没有约定,则默认定义时长为60s - }; - - // 创建httpClient - return HttpClients.custom() - // 把请求相关的超时信息设置到连接客户端 - .setDefaultRequestConfig(requestConfig) - // 把请求重试设置到连接客户端 - .setRetryHandler(retry) - .setKeepAliveStrategy(myStrategy) - // 配置连接池管理对象 - .setConnectionManager(connectionManager) - .build(); - } - - /** - * Desc: 发起http delete请求,返回status code与response body - * @param url - * @param socketTimeout - * @return {@link Map< String, String>} - * @created by wWei - * @date 2021/1/8 3:29 下午 - */ - public Map httpDelete(String url, int socketTimeout) { - Map resultMap = Maps.newHashMap(); - // 创建GET请求对象 - CloseableHttpResponse response = null; - try { - HttpDelete httpDelete = new HttpDelete(url); - // 执行请求 - response = getHttpClient(socketTimeout).execute(httpDelete); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); - resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); - } catch (ClientProtocolException e) { - LOG.error("协议错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (ParseException e) { - LOG.error("解析错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (IOException e) { - LOG.error("IO错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); - resultMap.put("message", e.getMessage()); - } catch (Exception e) { - LOG.error("其它错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - resultMap.put("message", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("释放链接错误: {}", e.getMessage()); - } - } - } - return resultMap; - } - - /** - * 返回status code与response body - * @param url:请求地址 - * @param socketTimeout: 响应超时时间 - * - **/ - public Map httpGet(String url, int socketTimeout) { - Map resultMap = Maps.newHashMap(); - // 创建GET请求对象 - CloseableHttpResponse response = null; - try { - HttpGet httpGet = new HttpGet(url); - // 执行请求 - response = getHttpClient(socketTimeout).execute(httpGet); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); - resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); - } catch (ClientProtocolException e) { - LOG.error("ClientProtocolException:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (ParseException e) { - LOG.error("ParseException:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (IOException e) { - LOG.error("IOException:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); - resultMap.put("message", e.getMessage()); - } catch (Exception e) { - LOG.error("Exception:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - resultMap.put("message", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("CloseConnectionException:{}", e.getMessage()); - } - } - } - return resultMap; - } - - /** - * 返回status code与response body - * @param url:请求地址 - * @param headers: Headers - * @param socketTimeOut: 响应超时时间 - * @return: java.util.Map - **/ - public Map httpGet(String url, Map headers, int socketTimeOut) { - Map resultMap = Maps.newHashMap(); - // 创建GET请求对象 - CloseableHttpResponse response = null; - try { - HttpGet httpGet = new HttpGet(url); - for (String key : headers.keySet()) { - httpGet.setHeader(key, headers.get(key)); - } - // 执行请求 - response = getHttpClient(socketTimeOut).execute(httpGet); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); - resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); - } catch (ClientProtocolException e) { - LOG.error("ClientProtocolException:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (ParseException e) { - LOG.error("ParseException:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (IOException e) { - LOG.error("IOException:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); - resultMap.put("message", e.getMessage()); - } catch (Exception e) { - LOG.error("Exception:{}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - resultMap.put("message", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("CloseConnectionException:{}", e.getMessage()); - } - } - } - return resultMap; - } - - /** - * 返回status code与response body - * @param url:请求地址 - * @param jsonString:请求参数 - * @param socketTimeOut:响应超时时间 - **/ - public Map httpPost(String url, String jsonString, int socketTimeOut) { - Map resultMap = Maps.newHashMap(); - // 创建GET请求对象 - CloseableHttpResponse response = null; - try { - HttpPost httpPost = new HttpPost(url); - httpPost.setHeader("Content-Type", "application/json"); - httpPost.setEntity(new ByteArrayEntity(jsonString.getBytes("utf-8"))); - response = getHttpClient(socketTimeOut).execute(httpPost); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); - resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); - } catch (ClientProtocolException e) { - LOG.error("协议错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (ParseException e) { - LOG.error("解析错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); - resultMap.put("message", e.getMessage()); - } catch (IOException e) { - LOG.error("IO错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); - resultMap.put("message", e.getMessage()); - } catch (Exception e) { - LOG.error("其它错误: {}", e.getMessage()); - resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - resultMap.put("message", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("释放链接错误: {}", e.getMessage()); - } - } - } - return resultMap; - } - - /** - * 返回status code与response body - * @param url:请求地址 - * @param headers: Headers - * @param socketTimeOut: 响应超时时间 - **/ - public Map getHttpPostResponseHeads(String url, Map headers, int socketTimeOut) { - CloseableHttpResponse response = null; - HashMap map = Maps.newHashMap(); - try { - HttpPost httpPost = new HttpPost(url); - for (Object k : headers.keySet()) { - httpPost.setHeader(k.toString(), headers.get(k).toString()); - } - response = getHttpClient(socketTimeOut).execute(httpPost); - Header[] Headers = response.getAllHeaders(); - for (Header h : Headers) { - map.put(h.getName().toUpperCase(), h.getValue()); - } - } catch (ClientProtocolException e) { - LOG.error("协议错误: {}", e.getMessage()); - } catch (ParseException e) { - LOG.error("解析错误: {}", e.getMessage()); - } catch (IOException e) { - LOG.error("IO错误: {}", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("释放链接错误: {}", e.getMessage()); - } - } - } - return map; - } - - /** - * @param url:请求地址 - **/ - public String httpGet(String url) { - String msg = "-1"; - // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(ApplicationConfig.HTTP_RESPONSE_TIMEOUT); - CloseableHttpResponse response = null; - try { - URL ul = new URL(url); - URI uri = new URI(ul.getProtocol(), null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); - LOG.info("http get uri {}", uri); - // 创建GET请求对象 - HttpGet httpGet = new HttpGet(uri); - // 执行请求 - response = httpClient.execute(httpGet); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - if (statusCode != HttpStatus.SC_OK) { - LOG.error("Http get content is :" + msg); - System.exit(1); - } - } catch (URISyntaxException e) { - LOG.error("URI 转换错误: {}", e.getMessage()); - } catch (ClientProtocolException e) { - LOG.error("协议错误: {}", e.getMessage()); - } catch (ParseException e) { - LOG.error("解析错误: {}", e.getMessage()); - } catch (IOException e) { - LOG.error("IO错误: {}", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consume(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("释放链接错误: {}", e.getMessage()); - } - } - } - return msg; - } - - /** - * @param url: 请求地址 - * @param requestBody: 请求参数 - * @param headers: Header - **/ - public String httpPost(String url, String requestBody, Header... headers) { - String msg = "-1"; - // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(ApplicationConfig.HTTP_RESPONSE_TIMEOUT); - // 创建POST请求对象 - CloseableHttpResponse response = null; - try { - - URL ul = new URL(url); - URI uri = new URI(ul.getProtocol(), null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); - LOG.debug("http post uri:{}, http post body:{}", uri, requestBody); - HttpPost httpPost = new HttpPost(uri); - httpPost.setHeader("Content-Type", "application/json"); - if (StringUtil.isNotEmpty(headers)) { - for (Header h : headers) { - httpPost.addHeader(h); - } - } - if (StringUtil.isNotBlank(requestBody)) { - httpPost.setEntity(new ByteArrayEntity(requestBody.getBytes("utf-8"))); - } - response = httpClient.execute(httpPost); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_CREATED) { - LOG.error(msg); - System.exit(1); - } - } catch (URISyntaxException e) { - LOG.error("URI 转换错误: {}", e.getMessage()); - } catch (ClientProtocolException e) { - LOG.error("协议错误: {}", e.getMessage()); - } catch (ParseException e) { - LOG.error("解析错误: {}", e.getMessage()); - } catch (IOException e) { - LOG.error("IO错误: {}", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - LOG.error("释放链接错误: {}", e.getMessage()); - } - } - } - return msg; - } -}