diff --git a/config/application.yml b/config/application.yml index a2f9d98..613ad62 100644 --- a/config/application.yml +++ b/config/application.yml @@ -2,7 +2,7 @@ nacos: config: type: yaml server-addr: 192.168.44.12:8848 - namespace: dev + namespace: test data-id: p19-file-sync-service auto-refresh: true group: Galaxy diff --git a/pom.xml b/pom.xml index d894412..06c66dc 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi p19-file-sync-service - 23.03.09 + 23.09.26 p19-file-sync-service @@ -104,13 +104,11 @@ hutool-all 5.5.7 - org.springframework.boot spring-boot-starter-test test - org.apache.httpcomponents httpclient @@ -136,13 +134,11 @@ commons-io 2.4 - com.alibaba.boot nacos-config-spring-boot-starter ${nacos.config.version} - org.springframework.boot spring-boot-starter-actuator diff --git a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java index 2903d63..0acbdd0 100644 --- a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java +++ b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java @@ -10,12 +10,13 @@ import org.apache.http.NoHttpResponseException; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.protocol.HttpContext; -import org.springframework.beans.factory.annotation.Qualifier; +import org.apache.http.ssl.SSLContextBuilder; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @@ -77,109 +78,71 @@ public class HttpClientPool { this.retryNum = retryNum; } - /** - * 首先实例化一个连接池管理器,设置最大连接数、并发连接数 - * - * @return - */ - @Bean(name = "httpClientConnectionManager") - public PoolingHttpClientConnectionManager getHttpClientConnectionManager() { - PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(); - //最大连接数 - httpClientConnectionManager.setMaxTotal(maxTotal); - //并发数 - httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute); - return httpClientConnectionManager; - } - - /** - * 实例化连接池,设置连接池管理器。 - * 这里需要以参数形式注入上面实例化的连接池管理器 - * - * @param httpClientConnectionManager - * @return - */ - @Bean(name = "httpClientBuilder") - public HttpClientBuilder getHttpClientBuilder(@Qualifier("httpClientConnectionManager") PoolingHttpClientConnectionManager httpClientConnectionManager) { - //HttpClientBuilder中的构造方法被protected修饰,所以这里不能直接使用new来实例化一个HttpClientBuilder,可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象 - HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); - httpClientBuilder.setConnectionManager(httpClientConnectionManager); - return httpClientBuilder; - } - - /** - * 注入连接池,用于获取httpClient - * - * @param httpClientBuilder - * @return - */ - @Bean - public CloseableHttpClient getCloseableHttpClient(@Qualifier("httpClientBuilder") HttpClientBuilder httpClientBuilder,@Qualifier("httpRetryHandler") HttpRequestRetryHandler httpRetryHandler){ - return httpClientBuilder - .setRetryHandler(httpRetryHandler) - .build(); - } - - /** - * Builder是RequestConfig的一个内部类 - * 通过RequestConfig的custom方法来获取到一个Builder对象 - * 设置builder的连接信息 - * 这里还可以设置proxy,cookieSpec等属性。有需要的话可以在此设置 - * - * @return - */ - @Bean(name = "builder") - public RequestConfig.Builder getBuilder() { - RequestConfig.Builder builder = RequestConfig.custom(); - return builder.setConnectTimeout(connectTimeout) - .setConnectionRequestTimeout(connectionRequestTimeout) - .setSocketTimeout(socketTimeout) - .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled); + @Bean(name = "httpClient") + public CloseableHttpClient getCloseableHttpClient(){ + CloseableHttpClient httpClient = null; + try { + HttpRequestRetryHandler httpRetryHandler = new HttpRequestRetryHandler() { + @Override + public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { + if (executionCount >= retryNum) {// 如果已经重试了3次,就放弃 + log.error("已完成重试次数"); + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof ConnectException) {// 连接被拒绝 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + if (!(request instanceof HttpEntityEnclosingRequest)) { + return true; + } + return false; + } + }; + PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(); + httpClientConnectionManager.setMaxTotal(maxTotal);//最大连接数 + httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);//并发数 + SSLContext sslContext = SSLContextBuilder.create() + .loadTrustMaterial(new TrustSelfSignedStrategy()) + .build(); + httpClient = HttpClientBuilder + .create() + .setConnectionManager(httpClientConnectionManager) + .setRetryHandler(httpRetryHandler) + .setSslcontext(sslContext) + .setSSLHostnameVerifier(new NoopHostnameVerifier()) + .build(); + } catch (Exception e) { + log.error("create httpClient error.", e); + } + return httpClient; } /** * 使用builder构建一个RequestConfig对象 */ @Bean(name = "requestConfig") - public RequestConfig getRequestConfig(@Qualifier("builder") RequestConfig.Builder builder) { - return builder.build(); - } - - @Bean(name = "httpRetryHandler") - public HttpRequestRetryHandler getHttpRetryHandler() { - return new HttpRequestRetryHandler() { - @Override - public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { - if (executionCount >= retryNum) {// 如果已经重试了3次,就放弃 - log.error("已完成重试次数"); - return false; - } - if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 - return true; - } - if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 - return false; - } - if (exception instanceof ConnectException) {// 连接被拒绝 - return false; - } - if (exception instanceof InterruptedIOException) {// 超时 - return true; - } - if (exception instanceof UnknownHostException) {// 目标服务器不可达 - return false; - } - if (exception instanceof SSLException) {// ssl握手异常 - return false; - } - HttpClientContext clientContext = HttpClientContext.adapt(context); - HttpRequest request = clientContext.getRequest(); - // 如果请求是幂等的,就再次尝试 - if (!(request instanceof HttpEntityEnclosingRequest)) { - return true; - } - return false; - } - }; + public RequestConfig getRequestConfig() { + return RequestConfig.custom().setConnectTimeout(connectTimeout) + .setConnectionRequestTimeout(connectionRequestTimeout) + .setSocketTimeout(socketTimeout) + .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled).build(); } } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java similarity index 88% rename from src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java rename to src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java index ace726a..d219c3b 100644 --- a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java +++ b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java @@ -10,7 +10,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; @@ -22,7 +21,7 @@ import java.util.Map; //@ConfigurationProperties(prefix = "kafka.consumer") @NacosConfigurationProperties(prefix = "kafka.consumer", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) @EnableKafka -public class KafkaConsumerConfig { +public class KafkaConsumer { private String servers; private boolean enable_auto_commit; @@ -88,7 +87,7 @@ public class KafkaConsumerConfig { @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); + factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setConcurrency(concurrency); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setBatchListener(batch_listener);//设置为批量消费 @@ -96,11 +95,6 @@ public class KafkaConsumerConfig { return factory; } - private ConsumerFactory consumerFactory() { - return new DefaultKafkaConsumerFactory<>(consumerConfigs()); - } - - private Map consumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); @@ -112,9 +106,11 @@ public class KafkaConsumerConfig { propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, group_id); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset); - propsMap.put("security.protocol", "SASL_PLAINTEXT"); - propsMap.put("sasl.mechanism", "PLAIN"); - propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";"); + if(servers.contains("9094")){ + propsMap.put("security.protocol", "SASL_PLAINTEXT"); + propsMap.put("sasl.mechanism", "PLAIN"); + propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";"); + } return propsMap; } @@ -122,5 +118,4 @@ public class KafkaConsumerConfig { public KafkaConsumerListener listener() { return new KafkaConsumerListener(); } - } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java b/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java index f0ea5ba..6cdffae 100644 --- a/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java +++ b/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java @@ -6,8 +6,7 @@ import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; @Configuration //@ConfigurationProperties(prefix = "thread") @@ -25,7 +24,9 @@ public class ThreadPoolFactory { } @Bean(name = "threadPool") - public ExecutorService getThreadPool() { - return Executors.newFixedThreadPool(maxSize); + public ThreadPoolExecutor getThreadPool() { + return new ThreadPoolExecutor( + maxSize, maxSize, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxSize*2)); } } diff --git a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java index ff33ba2..4df5f4d 100644 --- a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java +++ b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java @@ -9,6 +9,7 @@ import com.zdjizhi.syncfile.core.SyncFiles; import com.zdjizhi.syncfile.entity.Source; import com.zdjizhi.syncfile.entity.SysFileSync; import com.zdjizhi.syncfile.monitor.MonitorProperties; +import com.zdjizhi.syncfile.utils.HttpUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; @@ -17,6 +18,7 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; @Component public class KafkaConsumerListener { @@ -26,6 +28,10 @@ public class KafkaConsumerListener { SyncFiles syncFiles; @Autowired MonitorProperties monitorProperties; + @Autowired + HttpUtil httpUtil; + @Autowired + ThreadPoolExecutor threadPool; @KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory") public void listen(List> records, Acknowledgment ack) { @@ -36,19 +42,26 @@ public class KafkaConsumerListener { JSONObject jsonObj = (JSONObject) JSON.parse(record.value().toString()); SysFileSync sysFileSync = JSON.toJavaObject(jsonObj, SysFileSync.class); if (sysFileSync != null) { - List sourceList = sysFileSync.getSourceList(); - if(sourceList.size() < 1){ - log.error("kafka data error, sourceList is null. kafka data: "+record.value().toString()); + List sourceList = sysFileSync.getSource_list(); + if (sourceList.size() < 1) { + log.error("kafka data error, sourceList is null. kafka data: " + record.value().toString()); monitorProperties.addFileSyncError(); - }else { + } else { fileList.add(sourceList); } - }else { - log.error("parse kafka data error. kafka data: "+record.value().toString()); + } else { + log.error("parse kafka data error. kafka data: " + record.value().toString()); monitorProperties.addFileSyncError(); } + monitorProperties.addKafkaRecordCount(); } - syncFiles.syncFiles(fileList); + while (true) { + if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) { + threadPool.submit(() -> syncFiles.syncFiles(fileList)); + break; + } + } + ack.acknowledge(); } catch (Exception e) { log.error("consume kafka data error.", e); monitorProperties.addFileSyncError(); diff --git a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java index b83ee5c..ecf608a 100644 --- a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java +++ b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java @@ -2,17 +2,13 @@ package com.zdjizhi.syncfile.core; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.syncfile.config.ThreadPoolFactory; import com.zdjizhi.syncfile.entity.Source; import com.zdjizhi.syncfile.monitor.MonitorProperties; import com.zdjizhi.syncfile.utils.HttpUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; @Component public class SyncFiles { @@ -21,62 +17,33 @@ public class SyncFiles { @Autowired private HttpUtil httpUtil; @Autowired - private ExecutorService threadPool; - @Autowired - private ThreadPoolFactory threadPoolFactory; - @Autowired MonitorProperties monitorProperties; public void syncFiles(List> fileList) { - List> callableList = new ArrayList<>(); try { for (List sourceList : fileList) { - callableList.add(() -> { - boolean status = false; - try { - for (Source source : sourceList) { - String source_oss_path = source.getSource_oss_path(); - String destination_oss_path = source.getDestination_oss_path(); - if (source_oss_path != null && !"".equals(source_oss_path) - && destination_oss_path != null && !"".equals(destination_oss_path)) { - byte[] file = httpUtil.httpGetFile(source_oss_path); - if (file != null) { - boolean isSuccess = httpUtil.httpPostFile(destination_oss_path, file); - if (!isSuccess) { - log.error("Sync file failed, post oss file error. destination_oss_path: {}", destination_oss_path); - monitorProperties.addPostFileErrorCount(); - return false; - } else { - status = true; - } - } else { - log.error("Sync file failed, get hos file error. source_oss_path: {}", source_oss_path); - monitorProperties.addDownloadFileErrorCount(); - return false; - } - } else { - log.error("Sync file failed, source_oss_path or destination_oss_path is incorrect. source_oss_path: {} ,destination_oss_path: {}", source_oss_path, destination_oss_path); - monitorProperties.addFileSyncError(); - return false; + for (Source source : sourceList) { + String source_oss_path = source.getSource_oss_path(); + String destination_oss_path = source.getDestination_oss_path(); + if (source_oss_path != null && !"".equals(source_oss_path) && destination_oss_path != null && !"".equals(destination_oss_path)) { + byte[] file = httpUtil.httpGetFile(source_oss_path); + if (file != null) { + boolean isSuccess = httpUtil.httpPostFile(destination_oss_path, file); + if (!isSuccess) { + log.error("Sync file failed, post oss file error. destination_oss_path: {}", destination_oss_path); + monitorProperties.addPostFileErrorCount(); } + } else { + log.error("Sync file failed, get hos file error. source_oss_path: {}", source_oss_path); + monitorProperties.addDownloadFileErrorCount(); } - } catch (Exception e) { - log.error("Sync file failed.", e); + } else { + log.error("Sync file failed, source_oss_path or destination_oss_path is incorrect. source_oss_path: {} ,destination_oss_path: {}", source_oss_path, destination_oss_path); monitorProperties.addFileSyncError(); - status = false; } - return status; - }); - if (callableList.size() == threadPoolFactory.getMaxSize()) { - threadPool.invokeAll(callableList); - callableList.clear(); } } - if (callableList.size() > 0) { - threadPool.invokeAll(callableList); - callableList.clear(); - } - } catch (InterruptedException e) { + } catch (Exception e) { log.error("Sync files failed.", e); monitorProperties.addFileSyncError(); } diff --git a/src/main/java/com/zdjizhi/syncfile/entity/Source.java b/src/main/java/com/zdjizhi/syncfile/entity/Source.java index 366787e..0e2e90b 100644 --- a/src/main/java/com/zdjizhi/syncfile/entity/Source.java +++ b/src/main/java/com/zdjizhi/syncfile/entity/Source.java @@ -5,6 +5,11 @@ public class Source { private String source_oss_path; private String destination_oss_path; + public Source(String source_oss_path, String destination_oss_path) { + this.source_oss_path = source_oss_path; + this.destination_oss_path = destination_oss_path; + } + public String getSource_oss_path() { return source_oss_path; } diff --git a/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java b/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java index d979f3a..59e24d2 100644 --- a/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java +++ b/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java @@ -3,18 +3,18 @@ package com.zdjizhi.syncfile.entity; import java.util.List; public class SysFileSync { - private List sourceList; + private List source_list; private long common_log_id; private long common_recv_time; private String common_schema_type; private long processing_time; - public List getSourceList() { - return sourceList; + public List getSource_list() { + return source_list; } - public void setSourceList(List sourceList) { - this.sourceList = sourceList; + public void setSource_list(List source_list) { + this.source_list = source_list; } public long getCommon_log_id() { diff --git a/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java b/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java index 942c0e6..eec9efb 100644 --- a/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java +++ b/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java @@ -57,12 +57,15 @@ public class LogChartMetricsFilter implements Filter { Long hosError = monitorProperties.getHosError(); Long ossError = monitorProperties.getOssError(); + Long kafkaRecordCount = monitorProperties.getKafkaRecordCount(); + dashboardMap.put("downloadFileSuccessCount", downloadFileSuccessCount); dashboardMap.put("downloadFileErrorCount",downloadFileErrorCount); dashboardMap.put("postFileSuccessCount", postFileSuccessCount); dashboardMap.put("postFileErrorCount", postFileErrorCount); dashboardMap.put("downloadFileSize", downloadFileSize); dashboardMap.put("postFileSize", postFileSize); + dashboardMap.put("kafkaRecordCount",kafkaRecordCount); errorTypeMap.put("fileSyncError",fileSyncError); errorTypeMap.put("hosError",hosError); @@ -91,5 +94,4 @@ public class LogChartMetricsFilter implements Filter { } } } - } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java b/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java index d81dbd0..6d7f911 100644 --- a/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java +++ b/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java @@ -4,6 +4,8 @@ import org.springframework.context.annotation.Configuration; @Configuration public class MonitorProperties { + private static Long kafkaRecordCount = 0L; + private static Long downloadFileSuccessCount = 0L; private static Long downloadFileErrorCount = 0L; private static Long postFileSuccessCount = 0L; @@ -15,6 +17,14 @@ public class MonitorProperties { private static Long hosError = 0L; private static Long ossError = 0L; + public Long getKafkaRecordCount() { + return kafkaRecordCount; + } + + public void addKafkaRecordCount() { + kafkaRecordCount = kafkaRecordCount + 1; + } + public void addDownloadFileSuccessCount() { downloadFileSuccessCount = downloadFileSuccessCount + 1; } diff --git a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java index 4398129..b36aaf5 100644 --- a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java +++ b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java @@ -26,7 +26,7 @@ public class HttpUtil { @Autowired private RequestConfig requestConfig; @Autowired - MonitorProperties monitorProperties; + private MonitorProperties monitorProperties; public byte[] httpGetFile(String url) { byte[] data = null;