优化代码及性能

This commit is contained in:
houjinchuan
2023-09-26 16:39:27 +08:00
parent bea54cff55
commit 8f64e062f9
12 changed files with 137 additions and 185 deletions

View File

@@ -2,7 +2,7 @@ nacos:
config:
type: yaml
server-addr: 192.168.44.12:8848
namespace: dev
namespace: test
data-id: p19-file-sync-service
auto-refresh: true
group: Galaxy

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>p19-file-sync-service</artifactId>
<version>23.03.09</version>
<version>23.09.26</version>
<name>p19-file-sync-service</name>
<parent>
@@ -104,13 +104,11 @@
<artifactId>hutool-all</artifactId>
<version>5.5.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
@@ -136,13 +134,11 @@
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>${nacos.config.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>

View File

@@ -10,12 +10,13 @@ import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.springframework.beans.factory.annotation.Qualifier;
import org.apache.http.ssl.SSLContextBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@@ -77,109 +78,71 @@ public class HttpClientPool {
this.retryNum = retryNum;
}
/**
* 首先实例化一个连接池管理器,设置最大连接数、并发连接数
*
* @return
*/
@Bean(name = "httpClientConnectionManager")
public PoolingHttpClientConnectionManager getHttpClientConnectionManager() {
PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager();
//最大连接数
httpClientConnectionManager.setMaxTotal(maxTotal);
//并发数
httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
return httpClientConnectionManager;
}
/**
* 实例化连接池,设置连接池管理器。
* 这里需要以参数形式注入上面实例化的连接池管理器
*
* @param httpClientConnectionManager
* @return
*/
@Bean(name = "httpClientBuilder")
public HttpClientBuilder getHttpClientBuilder(@Qualifier("httpClientConnectionManager") PoolingHttpClientConnectionManager httpClientConnectionManager) {
//HttpClientBuilder中的构造方法被protected修饰所以这里不能直接使用new来实例化一个HttpClientBuilder可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setConnectionManager(httpClientConnectionManager);
return httpClientBuilder;
}
/**
* 注入连接池用于获取httpClient
*
* @param httpClientBuilder
* @return
*/
@Bean
public CloseableHttpClient getCloseableHttpClient(@Qualifier("httpClientBuilder") HttpClientBuilder httpClientBuilder,@Qualifier("httpRetryHandler") HttpRequestRetryHandler httpRetryHandler){
return httpClientBuilder
.setRetryHandler(httpRetryHandler)
.build();
}
/**
* Builder是RequestConfig的一个内部类
* 通过RequestConfig的custom方法来获取到一个Builder对象
* 设置builder的连接信息
* 这里还可以设置proxycookieSpec等属性。有需要的话可以在此设置
*
* @return
*/
@Bean(name = "builder")
public RequestConfig.Builder getBuilder() {
RequestConfig.Builder builder = RequestConfig.custom();
return builder.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout)
.setStaleConnectionCheckEnabled(staleConnectionCheckEnabled);
@Bean(name = "httpClient")
public CloseableHttpClient getCloseableHttpClient(){
CloseableHttpClient httpClient = null;
try {
HttpRequestRetryHandler httpRetryHandler = new HttpRequestRetryHandler() {
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
if (executionCount >= retryNum) {// 如果已经重试了3次就放弃
log.error("已完成重试次数");
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
return true;
}
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
return false;
}
if (exception instanceof ConnectException) {// 连接被拒绝
return false;
}
if (exception instanceof InterruptedIOException) {// 超时
return true;
}
if (exception instanceof UnknownHostException) {// 目标服务器不可达
return false;
}
if (exception instanceof SSLException) {// ssl握手异常
return false;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
// 如果请求是幂等的,就再次尝试
if (!(request instanceof HttpEntityEnclosingRequest)) {
return true;
}
return false;
}
};
PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager();
httpClientConnectionManager.setMaxTotal(maxTotal);//最大连接数
httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);//并发数
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial(new TrustSelfSignedStrategy())
.build();
httpClient = HttpClientBuilder
.create()
.setConnectionManager(httpClientConnectionManager)
.setRetryHandler(httpRetryHandler)
.setSslcontext(sslContext)
.setSSLHostnameVerifier(new NoopHostnameVerifier())
.build();
} catch (Exception e) {
log.error("create httpClient error.", e);
}
return httpClient;
}
/**
* 使用builder构建一个RequestConfig对象
*/
@Bean(name = "requestConfig")
public RequestConfig getRequestConfig(@Qualifier("builder") RequestConfig.Builder builder) {
return builder.build();
}
@Bean(name = "httpRetryHandler")
public HttpRequestRetryHandler getHttpRetryHandler() {
return new HttpRequestRetryHandler() {
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
if (executionCount >= retryNum) {// 如果已经重试了3次就放弃
log.error("已完成重试次数");
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
return true;
}
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
return false;
}
if (exception instanceof ConnectException) {// 连接被拒绝
return false;
}
if (exception instanceof InterruptedIOException) {// 超时
return true;
}
if (exception instanceof UnknownHostException) {// 目标服务器不可达
return false;
}
if (exception instanceof SSLException) {// ssl握手异常
return false;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
// 如果请求是幂等的,就再次尝试
if (!(request instanceof HttpEntityEnclosingRequest)) {
return true;
}
return false;
}
};
public RequestConfig getRequestConfig() {
return RequestConfig.custom().setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout)
.setStaleConnectionCheckEnabled(staleConnectionCheckEnabled).build();
}
}

View File

@@ -10,7 +10,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
@@ -22,7 +21,7 @@ import java.util.Map;
//@ConfigurationProperties(prefix = "kafka.consumer")
@NacosConfigurationProperties(prefix = "kafka.consumer", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
@EnableKafka
public class KafkaConsumerConfig {
public class KafkaConsumer {
private String servers;
private boolean enable_auto_commit;
@@ -88,7 +87,7 @@ public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setConcurrency(concurrency);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(batch_listener);//设置为批量消费
@@ -96,11 +95,6 @@ public class KafkaConsumerConfig {
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
@@ -112,9 +106,11 @@ public class KafkaConsumerConfig {
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
propsMap.put("security.protocol", "SASL_PLAINTEXT");
propsMap.put("sasl.mechanism", "PLAIN");
propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";");
if(servers.contains("9094")){
propsMap.put("security.protocol", "SASL_PLAINTEXT");
propsMap.put("sasl.mechanism", "PLAIN");
propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";");
}
return propsMap;
}
@@ -122,5 +118,4 @@ public class KafkaConsumerConfig {
public KafkaConsumerListener listener() {
return new KafkaConsumerListener();
}
}

View File

@@ -6,8 +6,7 @@ import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
@Configuration
//@ConfigurationProperties(prefix = "thread")
@@ -25,7 +24,9 @@ public class ThreadPoolFactory {
}
@Bean(name = "threadPool")
public ExecutorService getThreadPool() {
return Executors.newFixedThreadPool(maxSize);
public ThreadPoolExecutor getThreadPool() {
return new ThreadPoolExecutor(
maxSize, maxSize,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxSize*2));
}
}

View File

@@ -9,6 +9,7 @@ import com.zdjizhi.syncfile.core.SyncFiles;
import com.zdjizhi.syncfile.entity.Source;
import com.zdjizhi.syncfile.entity.SysFileSync;
import com.zdjizhi.syncfile.monitor.MonitorProperties;
import com.zdjizhi.syncfile.utils.HttpUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
@@ -17,6 +18,7 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
@Component
public class KafkaConsumerListener {
@@ -26,6 +28,10 @@ public class KafkaConsumerListener {
SyncFiles syncFiles;
@Autowired
MonitorProperties monitorProperties;
@Autowired
HttpUtil httpUtil;
@Autowired
ThreadPoolExecutor threadPool;
@KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
@@ -36,19 +42,26 @@ public class KafkaConsumerListener {
JSONObject jsonObj = (JSONObject) JSON.parse(record.value().toString());
SysFileSync sysFileSync = JSON.toJavaObject(jsonObj, SysFileSync.class);
if (sysFileSync != null) {
List<Source> sourceList = sysFileSync.getSourceList();
if(sourceList.size() < 1){
log.error("kafka data error, sourceList is null. kafka data: "+record.value().toString());
List<Source> sourceList = sysFileSync.getSource_list();
if (sourceList.size() < 1) {
log.error("kafka data error, sourceList is null. kafka data: " + record.value().toString());
monitorProperties.addFileSyncError();
}else {
} else {
fileList.add(sourceList);
}
}else {
log.error("parse kafka data error. kafka data: "+record.value().toString());
} else {
log.error("parse kafka data error. kafka data: " + record.value().toString());
monitorProperties.addFileSyncError();
}
monitorProperties.addKafkaRecordCount();
}
syncFiles.syncFiles(fileList);
while (true) {
if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) {
threadPool.submit(() -> syncFiles.syncFiles(fileList));
break;
}
}
ack.acknowledge();
} catch (Exception e) {
log.error("consume kafka data error.", e);
monitorProperties.addFileSyncError();

View File

@@ -2,17 +2,13 @@ package com.zdjizhi.syncfile.core;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.syncfile.config.ThreadPoolFactory;
import com.zdjizhi.syncfile.entity.Source;
import com.zdjizhi.syncfile.monitor.MonitorProperties;
import com.zdjizhi.syncfile.utils.HttpUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@Component
public class SyncFiles {
@@ -21,62 +17,33 @@ public class SyncFiles {
@Autowired
private HttpUtil httpUtil;
@Autowired
private ExecutorService threadPool;
@Autowired
private ThreadPoolFactory threadPoolFactory;
@Autowired
MonitorProperties monitorProperties;
public void syncFiles(List<List<Source>> fileList) {
List<Callable<Boolean>> callableList = new ArrayList<>();
try {
for (List<Source> sourceList : fileList) {
callableList.add(() -> {
boolean status = false;
try {
for (Source source : sourceList) {
String source_oss_path = source.getSource_oss_path();
String destination_oss_path = source.getDestination_oss_path();
if (source_oss_path != null && !"".equals(source_oss_path)
&& destination_oss_path != null && !"".equals(destination_oss_path)) {
byte[] file = httpUtil.httpGetFile(source_oss_path);
if (file != null) {
boolean isSuccess = httpUtil.httpPostFile(destination_oss_path, file);
if (!isSuccess) {
log.error("Sync file failed, post oss file error. destination_oss_path: {}", destination_oss_path);
monitorProperties.addPostFileErrorCount();
return false;
} else {
status = true;
}
} else {
log.error("Sync file failed, get hos file error. source_oss_path: {}", source_oss_path);
monitorProperties.addDownloadFileErrorCount();
return false;
}
} else {
log.error("Sync file failed, source_oss_path or destination_oss_path is incorrect. source_oss_path: {} ,destination_oss_path: {}", source_oss_path, destination_oss_path);
monitorProperties.addFileSyncError();
return false;
for (Source source : sourceList) {
String source_oss_path = source.getSource_oss_path();
String destination_oss_path = source.getDestination_oss_path();
if (source_oss_path != null && !"".equals(source_oss_path) && destination_oss_path != null && !"".equals(destination_oss_path)) {
byte[] file = httpUtil.httpGetFile(source_oss_path);
if (file != null) {
boolean isSuccess = httpUtil.httpPostFile(destination_oss_path, file);
if (!isSuccess) {
log.error("Sync file failed, post oss file error. destination_oss_path: {}", destination_oss_path);
monitorProperties.addPostFileErrorCount();
}
} else {
log.error("Sync file failed, get hos file error. source_oss_path: {}", source_oss_path);
monitorProperties.addDownloadFileErrorCount();
}
} catch (Exception e) {
log.error("Sync file failed.", e);
} else {
log.error("Sync file failed, source_oss_path or destination_oss_path is incorrect. source_oss_path: {} ,destination_oss_path: {}", source_oss_path, destination_oss_path);
monitorProperties.addFileSyncError();
status = false;
}
return status;
});
if (callableList.size() == threadPoolFactory.getMaxSize()) {
threadPool.invokeAll(callableList);
callableList.clear();
}
}
if (callableList.size() > 0) {
threadPool.invokeAll(callableList);
callableList.clear();
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Sync files failed.", e);
monitorProperties.addFileSyncError();
}

View File

@@ -5,6 +5,11 @@ public class Source {
private String source_oss_path;
private String destination_oss_path;
public Source(String source_oss_path, String destination_oss_path) {
this.source_oss_path = source_oss_path;
this.destination_oss_path = destination_oss_path;
}
public String getSource_oss_path() {
return source_oss_path;
}

View File

@@ -3,18 +3,18 @@ package com.zdjizhi.syncfile.entity;
import java.util.List;
public class SysFileSync {
private List<Source> sourceList;
private List<Source> source_list;
private long common_log_id;
private long common_recv_time;
private String common_schema_type;
private long processing_time;
public List<Source> getSourceList() {
return sourceList;
public List<Source> getSource_list() {
return source_list;
}
public void setSourceList(List<Source> sourceList) {
this.sourceList = sourceList;
public void setSource_list(List<Source> source_list) {
this.source_list = source_list;
}
public long getCommon_log_id() {

View File

@@ -57,12 +57,15 @@ public class LogChartMetricsFilter implements Filter {
Long hosError = monitorProperties.getHosError();
Long ossError = monitorProperties.getOssError();
Long kafkaRecordCount = monitorProperties.getKafkaRecordCount();
dashboardMap.put("downloadFileSuccessCount", downloadFileSuccessCount);
dashboardMap.put("downloadFileErrorCount",downloadFileErrorCount);
dashboardMap.put("postFileSuccessCount", postFileSuccessCount);
dashboardMap.put("postFileErrorCount", postFileErrorCount);
dashboardMap.put("downloadFileSize", downloadFileSize);
dashboardMap.put("postFileSize", postFileSize);
dashboardMap.put("kafkaRecordCount",kafkaRecordCount);
errorTypeMap.put("fileSyncError",fileSyncError);
errorTypeMap.put("hosError",hosError);
@@ -91,5 +94,4 @@ public class LogChartMetricsFilter implements Filter {
}
}
}
}

View File

@@ -4,6 +4,8 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class MonitorProperties {
private static Long kafkaRecordCount = 0L;
private static Long downloadFileSuccessCount = 0L;
private static Long downloadFileErrorCount = 0L;
private static Long postFileSuccessCount = 0L;
@@ -15,6 +17,14 @@ public class MonitorProperties {
private static Long hosError = 0L;
private static Long ossError = 0L;
public Long getKafkaRecordCount() {
return kafkaRecordCount;
}
public void addKafkaRecordCount() {
kafkaRecordCount = kafkaRecordCount + 1;
}
public void addDownloadFileSuccessCount() {
downloadFileSuccessCount = downloadFileSuccessCount + 1;
}

View File

@@ -26,7 +26,7 @@ public class HttpUtil {
@Autowired
private RequestConfig requestConfig;
@Autowired
MonitorProperties monitorProperties;
private MonitorProperties monitorProperties;
public byte[] httpGetFile(String url) {
byte[] data = null;