diff --git a/config/application.yml b/config/application.yml
new file mode 100644
index 0000000..a2f9d98
--- /dev/null
+++ b/config/application.yml
@@ -0,0 +1,19 @@
+nacos:
+ config:
+ type: yaml
+ server-addr: 192.168.44.12:8848
+ namespace: dev
+ data-id: p19-file-sync-service
+ auto-refresh: true
+ group: Galaxy
+ username: nacos
+ password: nacos
+ bootstrap:
+ enable: true
+ log:
+ enable: true
+spring:
+ profiles:
+ active: dev
+logging:
+ config: ./config/log4j2-dev.xml
\ No newline at end of file
diff --git a/config/log4j2-dev.xml b/config/log4j2-dev.xml
new file mode 100644
index 0000000..0cf368f
--- /dev/null
+++ b/config/log4j2-dev.xml
@@ -0,0 +1,56 @@
+
+
+
+
+
+
+ 200M
+
+ 10
+
+ info
+
+ logs
+
+ p19-file-sync-service
+
+ [%d{yyyy-MM-dd HH:mm:ss}] [%p] [Thread:%t] %l %x - %m%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
new file mode 100644
index 0000000..003dc2b
--- /dev/null
+++ b/docker/Dockerfile
@@ -0,0 +1,24 @@
+ARG JDK_IMAGE
+ARG GO_IMAGE
+#编译环境,生成二进制文件
+#FROM 192.168.40.153:9080/common/golang:1.15.6 as builder
+FROM ${GO_IMAGE} as builder
+
+WORKDIR /build
+COPY xjar.go /build/
+RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o xjar .
+
+FROM ${JDK_IMAGE}
+MAINTAINER Galaxy
+VOLUME /tmp
+WORKDIR /home/tsg/galaxy/p19-file-sync-service
+COPY --from=builder /build .
+COPY config config
+ARG JAR_FILE
+COPY ${JAR_FILE} p19-file-sync-service.xjar
+#dockercompose set JAVA_OPTS
+ENV JAVA_OPTS=" -Xmx1g -Xms1g -Xmn128m -Xss256k -XX:MetaspaceSize=128m -XX:MaxPermSize=128m -XX:SurvivorRatio=2 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 "
+ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
+#ENV TZ=Asia/Almaty
+#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
+ENTRYPOINT [ "sh", "-c", "./xjar java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar p19-file-sync-service.xjar" ]
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..7d84db1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,269 @@
+
+
+ 4.0.0
+
+ com.zdjizhi
+ p19-file-sync-service
+ 21.12.01
+ p19-file-sync-service
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.7.RELEASE
+
+
+
+
+
+ nexus
+ Team Nexus Repository
+ http://192.168.40.125:8099/content/groups/public/
+
+
+
+
+ nexus
+ Team Nexus Repository
+ http://192.168.40.125:8099/content/groups/public/
+
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 0.2.8
+ 192.168.40.153
+ 2375
+
+ 192.168.40.153
+
+ 9080
+ tsg/galaxy
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-xml
+
+
+ org.apache.logging.log4j
+ log4j-web
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.30
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.30
+ test
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ com.alibaba
+ fastjson
+ 1.2.69
+
+
+ cn.hutool
+ hutool-all
+ 5.5.7
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.4
+
+
+ org.apache.httpcomponents
+ httpmime
+ 4.4
+
+
+ org.apache.httpcomponents
+ httpcore
+ 4.4.6
+
+
+ org.apache.httpcomponents
+ httpasyncclient
+ 4.1.3
+
+
+ commons-io
+ commons-io
+ 2.4
+
+
+
+ com.alibaba.boot
+ nacos-config-spring-boot-starter
+ ${nacos.config.version}
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ io.micrometer
+ micrometer-registry-prometheus
+ 1.5.4
+
+
+ io.micrometer
+ micrometer-core
+ 1.5.4
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ io.github.zlika
+ reproducible-build-maven-plugin
+ 0.2
+
+
+
+ strip-jar
+
+ package
+
+
+
+
+ com.mesalab.xjar-maven-plugin
+ mesalab-xjar-maven-plugin
+ 1.0.0
+
+
+
+ build
+
+ package
+
+ Geedge2020!
+
+
+ static/**
+
+
+ templates/**
+
+
+ resources/**
+
+
+ META-INF/resources/**
+
+
+
+
+
+
+
+
+ com.spotify
+ docker-maven-plugin
+ 1.0.0
+
+ 153-docker-repo
+ ${docker.registry}:${docker.registry.port}
+
+ true
+
+ ${docker.registry}:${docker.registry.port}/${docker.image.prefix}/${project.artifactId}
+
+
+ true
+
+
+ ${project.version}
+
+
+ http://192.168.40.153:2375
+
+ docker
+
+ 192.168.40.153:9080/common/jdk:1.8.0_73-jre
+ 192.168.40.153:9080/common/golang:1.15.6
+ ${project.build.finalName}.xjar
+
+
+
+
+
+
+
+ /
+
+ ${project.build.directory}
+
+ ${project.build.finalName}.xjar
+ xjar.go
+
+
+
+ /config
+ config
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/P19FileSyncServiceApplication.java b/src/main/java/com/zdjizhi/syncfile/P19FileSyncServiceApplication.java
new file mode 100644
index 0000000..0208866
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/P19FileSyncServiceApplication.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.syncfile;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class P19FileSyncServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(P19FileSyncServiceApplication.class, args);
+ }
+ @Bean
+ MeterRegistryCustomizer configurer(@Value("${spring.application.name}") String applicationName){
+ return registry -> registry.config().commonTags("application", applicationName);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java
new file mode 100644
index 0000000..e9f47c5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java
@@ -0,0 +1,125 @@
+package com.zdjizhi.syncfile.config;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+
+@Component
+//@ConfigurationProperties(prefix = "http")
+@NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class HttpClientPool {
+
+ private Integer maxTotal;
+
+ private Integer defaultMaxPerRoute;
+
+ private Integer connectTimeout;
+
+ private Integer connectionRequestTimeout;
+
+ private Integer socketTimeout;
+
+ private boolean staleConnectionCheckEnabled;
+
+
+ public void setMaxTotal(Integer maxTotal) {
+ this.maxTotal = maxTotal;
+ }
+
+ public void setDefaultMaxPerRoute(Integer defaultMaxPerRoute) {
+ this.defaultMaxPerRoute = defaultMaxPerRoute;
+ }
+
+ public void setConnectTimeout(Integer connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {
+ this.connectionRequestTimeout = connectionRequestTimeout;
+ }
+
+ public void setSocketTimeout(Integer socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ public void setStaleConnectionCheckEnabled(boolean staleConnectionCheckEnabled) {
+ this.staleConnectionCheckEnabled = staleConnectionCheckEnabled;
+ }
+
+ /**
+ * 首先实例化一个连接池管理器,设置最大连接数、并发连接数
+ *
+ * @return
+ */
+ @Bean(name = "httpClientConnectionManager")
+ public PoolingHttpClientConnectionManager getHttpClientConnectionManager() {
+ PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager();
+ //最大连接数
+ httpClientConnectionManager.setMaxTotal(maxTotal);
+ //并发数
+ httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
+ return httpClientConnectionManager;
+ }
+
+ /**
+ * 实例化连接池,设置连接池管理器。
+ * 这里需要以参数形式注入上面实例化的连接池管理器
+ *
+ * @param httpClientConnectionManager
+ * @return
+ */
+ @Bean(name = "httpClientBuilder")
+ public HttpClientBuilder getHttpClientBuilder(@Qualifier("httpClientConnectionManager") PoolingHttpClientConnectionManager httpClientConnectionManager) {
+ //HttpClientBuilder中的构造方法被protected修饰,所以这里不能直接使用new来实例化一个HttpClientBuilder,可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+ httpClientBuilder.setConnectionManager(httpClientConnectionManager);
+ return httpClientBuilder;
+ }
+
+ /**
+ * 注入连接池,用于获取httpClient
+ *
+ * @param httpClientBuilder
+ * @return
+ */
+ @Bean
+ public CloseableHttpClient getCloseableHttpClient(@Qualifier("httpClientBuilder") HttpClientBuilder httpClientBuilder) {
+ return httpClientBuilder.build();
+ }
+
+ /**
+ * Builder是RequestConfig的一个内部类
+ * 通过RequestConfig的custom方法来获取到一个Builder对象
+ * 设置builder的连接信息
+ * 这里还可以设置proxy,cookieSpec等属性。有需要的话可以在此设置
+ *
+ * @return
+ */
+ @Bean(name = "builder")
+ public RequestConfig.Builder getBuilder() {
+ RequestConfig.Builder builder = RequestConfig.custom();
+ return builder.setConnectTimeout(connectTimeout)
+ .setConnectionRequestTimeout(connectionRequestTimeout)
+ .setSocketTimeout(socketTimeout)
+ .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled);
+
+ }
+
+ /**
+ * 使用builder构建一个RequestConfig对象
+ */
+ @Bean(name = "requestConfig")
+ public RequestConfig getRequestConfig(@Qualifier("builder") RequestConfig.Builder builder) {
+ return builder.build();
+ }
+
+
+}
+
diff --git a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..ace726a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java
@@ -0,0 +1,126 @@
+package com.zdjizhi.syncfile.config;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import com.zdjizhi.syncfile.consumer.KafkaConsumerListener;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+//@ConfigurationProperties(prefix = "kafka.consumer")
+@NacosConfigurationProperties(prefix = "kafka.consumer", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+@EnableKafka
+public class KafkaConsumerConfig {
+
+ private String servers;
+ private boolean enable_auto_commit;
+ private String session_timeout;
+ private String auto_commit_interval;
+ private String group_id;
+ private String auto_offset_reset;
+ private int poll_timeout;
+ private int concurrency;
+ private boolean batch_listener;
+ private int pool_record;
+ private String sasl_username;
+ private String sasl_password;
+
+ public void setServers(String servers) {
+ this.servers = servers;
+ }
+
+ public void setEnable_auto_commit(boolean enable_auto_commit) {
+ this.enable_auto_commit = enable_auto_commit;
+ }
+
+ public void setSession_timeout(String session_timeout) {
+ this.session_timeout = session_timeout;
+ }
+
+ public void setAuto_commit_interval(String auto_commit_interval) {
+ this.auto_commit_interval = auto_commit_interval;
+ }
+
+ public void setGroup_id(String group_id) {
+ this.group_id = group_id;
+ }
+
+ public void setAuto_offset_reset(String auto_offset_reset) {
+ this.auto_offset_reset = auto_offset_reset;
+ }
+
+ public void setPoll_timeout(int poll_timeout) {
+ this.poll_timeout = poll_timeout;
+ }
+
+ public void setConcurrency(int concurrency) {
+ this.concurrency = concurrency;
+ }
+
+ public void setBatch_listener(boolean batch_listener) {
+ this.batch_listener = batch_listener;
+ }
+
+ public void setPool_record(int pool_record) {
+ this.pool_record = pool_record;
+ }
+
+ public void setSasl_username(String sasl_username) {
+ this.sasl_username = sasl_username;
+ }
+
+ public void setSasl_password(String sasl_password) {
+ this.sasl_password = sasl_password;
+ }
+
+ @Bean
+ public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ factory.setConcurrency(concurrency);
+ factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+ factory.setBatchListener(batch_listener);//设置为批量消费
+ factory.getContainerProperties().setPollTimeout(poll_timeout);
+ return factory;
+ }
+
+ private ConsumerFactory consumerFactory() {
+ return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+ }
+
+
+ private Map consumerConfigs() {
+ Map propsMap = new HashMap<>();
+ propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+ propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
+ propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
+ propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool_record);//每一批数量
+ propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, session_timeout);
+ propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
+ propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
+ propsMap.put("security.protocol", "SASL_PLAINTEXT");
+ propsMap.put("sasl.mechanism", "PLAIN");
+ propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";");
+ return propsMap;
+ }
+
+ @Bean
+ public KafkaConsumerListener listener() {
+ return new KafkaConsumerListener();
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java b/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java
new file mode 100644
index 0000000..f0ea5ba
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java
@@ -0,0 +1,31 @@
+package com.zdjizhi.syncfile.config;
+
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Configuration
+//@ConfigurationProperties(prefix = "thread")
+@NacosConfigurationProperties(prefix = "thread", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class ThreadPoolFactory {
+
+ private Integer maxSize;
+
+ public void setMaxSize(Integer maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public Integer getMaxSize() {
+ return maxSize;
+ }
+
+ @Bean(name = "threadPool")
+ public ExecutorService getThreadPool() {
+ return Executors.newFixedThreadPool(maxSize);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java
new file mode 100644
index 0000000..ff33ba2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java
@@ -0,0 +1,59 @@
+package com.zdjizhi.syncfile.consumer;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+
+import com.zdjizhi.syncfile.core.SyncFiles;
+import com.zdjizhi.syncfile.entity.Source;
+import com.zdjizhi.syncfile.entity.SysFileSync;
+import com.zdjizhi.syncfile.monitor.MonitorProperties;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class KafkaConsumerListener {
+ private Log log = LogFactory.get();
+
+ @Autowired
+ SyncFiles syncFiles;
+ @Autowired
+ MonitorProperties monitorProperties;
+
+ @KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory")
+ public void listen(List> records, Acknowledgment ack) {
+ try {
+ List> fileList = new ArrayList<>();
+ for (ConsumerRecord, ?> record : records) {
+ log.info("消费kafka的数据的value: " + record.value().toString());
+ JSONObject jsonObj = (JSONObject) JSON.parse(record.value().toString());
+ SysFileSync sysFileSync = JSON.toJavaObject(jsonObj, SysFileSync.class);
+ if (sysFileSync != null) {
+ List sourceList = sysFileSync.getSourceList();
+ if(sourceList.size() < 1){
+ log.error("kafka data error, sourceList is null. kafka data: "+record.value().toString());
+ monitorProperties.addFileSyncError();
+ }else {
+ fileList.add(sourceList);
+ }
+ }else {
+ log.error("parse kafka data error. kafka data: "+record.value().toString());
+ monitorProperties.addFileSyncError();
+ }
+ }
+ syncFiles.syncFiles(fileList);
+ } catch (Exception e) {
+ log.error("consume kafka data error.", e);
+ monitorProperties.addFileSyncError();
+ } finally {
+ ack.acknowledge();
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java
new file mode 100644
index 0000000..6fc49df
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java
@@ -0,0 +1,89 @@
+package com.zdjizhi.syncfile.core;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.syncfile.config.ThreadPoolFactory;
+import com.zdjizhi.syncfile.entity.Source;
+import com.zdjizhi.syncfile.monitor.MonitorProperties;
+import com.zdjizhi.syncfile.utils.HttpUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+@Component
+public class SyncFiles {
+ private Log log = LogFactory.get();
+
+ @Autowired
+ private HttpUtil httpUtil;
+ @Autowired
+ private ExecutorService threadPool;
+ @Autowired
+ private ThreadPoolFactory threadPoolFactory;
+ @Autowired
+ MonitorProperties monitorProperties;
+
+ public void syncFiles(List> fileList) {
+ List> callableList = new ArrayList<>();
+ try {
+ for (List sourceList : fileList) {
+ callableList.add(() -> {
+ boolean status = false;
+ InputStream content = null;
+ try {
+ for (Source source : sourceList) {
+ String source_oss_path = source.getSource_oss_path();
+ String destination_oss_path = source.getDestination_oss_path();
+ if (source_oss_path != null && !"".equals(source_oss_path)
+ && destination_oss_path != null && !"".equals(destination_oss_path)) {
+ content = httpUtil.httpGetInputStream(source_oss_path);
+ if (content != null) {
+ boolean isSuccess = httpUtil.httpPostInputStream(destination_oss_path, content);
+ if (!isSuccess) {
+ log.error("Sync file failed, post oss file error. destination_oss_path: {}", destination_oss_path);
+ monitorProperties.addPostFileErrorCount();
+ return false;
+ } else {
+ status = true;
+ }
+ } else {
+ log.error("Sync file failed, get hos file error. source_oss_path: {}", source_oss_path);
+ monitorProperties.addDownloadFileErrorCount();
+ return false;
+ }
+ } else {
+ log.error("Sync file failed, source_oss_path or destination_oss_path is incorrect. source_oss_path: {} ,destination_oss_path: {}", source_oss_path, destination_oss_path);
+ monitorProperties.addFileSyncError();
+ return false;
+ }
+ }
+ } catch (Exception e) {
+ log.error("Sync file failed.", e);
+ monitorProperties.addFileSyncError();
+ status = false;
+ } finally {
+ IoUtil.close(content);
+ }
+ return status;
+ });
+ if (callableList.size() == threadPoolFactory.getMaxSize()) {
+ threadPool.invokeAll(callableList);
+ callableList.clear();
+ }
+ }
+ if (callableList.size() > 0) {
+ threadPool.invokeAll(callableList);
+ callableList.clear();
+ }
+ } catch (InterruptedException e) {
+ log.error("Sync files failed.", e);
+ monitorProperties.addFileSyncError();
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/Data.java b/src/main/java/com/zdjizhi/syncfile/entity/Data.java
new file mode 100644
index 0000000..8963823
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/Data.java
@@ -0,0 +1,40 @@
+package com.zdjizhi.syncfile.entity;
+
+public class Data {
+ private int fileSize;
+ private String sign;
+ private String state;
+
+ public int getFileSize() {
+ return fileSize;
+ }
+
+ public void setFileSize(int fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public String getSign() {
+ return sign;
+ }
+
+ public void setSign(String sign) {
+ this.sign = sign;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ @Override
+ public String toString() {
+ return "Data{" +
+ "fileSize=" + fileSize +
+ ", sign='" + sign + '\'' +
+ ", state='" + state + '\'' +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/PostFileResponse.java b/src/main/java/com/zdjizhi/syncfile/entity/PostFileResponse.java
new file mode 100644
index 0000000..3e7a80b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/PostFileResponse.java
@@ -0,0 +1,43 @@
+package com.zdjizhi.syncfile.entity;
+
+public class PostFileResponse {
+ private int code;
+ private Data data;
+ private String message;
+
+ public PostFileResponse() {
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public Data getData() {
+ return data;
+ }
+
+ public void setData(Data data) {
+ this.data = data;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return "PostFileResponse{" +
+ "code=" + code +
+ ", data=" + data +
+ ", message='" + message + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/Source.java b/src/main/java/com/zdjizhi/syncfile/entity/Source.java
new file mode 100644
index 0000000..366787e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/Source.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.syncfile.entity;
+
+public class Source {
+
+ private String source_oss_path;
+ private String destination_oss_path;
+
+ public String getSource_oss_path() {
+ return source_oss_path;
+ }
+
+ public void setSource_oss_path(String source_oss_path) {
+ this.source_oss_path = source_oss_path;
+ }
+
+ public String getDestination_oss_path() {
+ return destination_oss_path;
+ }
+
+ public void setDestination_oss_path(String destination_oss_path) {
+ this.destination_oss_path = destination_oss_path;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java b/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java
new file mode 100644
index 0000000..d979f3a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.syncfile.entity;
+
+import java.util.List;
+
+public class SysFileSync {
+ private List sourceList;
+ private long common_log_id;
+ private long common_recv_time;
+ private String common_schema_type;
+ private long processing_time;
+
+ public List getSourceList() {
+ return sourceList;
+ }
+
+ public void setSourceList(List sourceList) {
+ this.sourceList = sourceList;
+ }
+
+ public long getCommon_log_id() {
+ return common_log_id;
+ }
+
+ public void setCommon_log_id(long common_log_id) {
+ this.common_log_id = common_log_id;
+ }
+
+ public String getCommon_schema_type() {
+ return common_schema_type;
+ }
+
+ public void setCommon_schema_type(String common_schema_type) {
+ this.common_schema_type = common_schema_type;
+ }
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public long getProcessing_time() {
+ return processing_time;
+ }
+
+ public void setProcessing_time(long processing_time) {
+ this.processing_time = processing_time;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java b/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java
new file mode 100644
index 0000000..942c0e6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java
@@ -0,0 +1,95 @@
+package com.zdjizhi.syncfile.monitor;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import javax.servlet.*;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class LogChartMetricsFilter implements Filter {
+ @Autowired
+ private MonitorProperties monitorProperties;
+
+ private MeterRegistry registry;
+ private Map dashboardMap = new HashMap<>();
+ private Map errorTypeMap = new HashMap<>();
+
+ public LogChartMetricsFilter(MeterRegistry registry) {
+ this.registry = registry;
+ }
+
+ @Override
+ public void init(FilterConfig filterConfig) {
+ }
+
+ @Override
+ public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
+ HttpServletRequest req = (HttpServletRequest) servletRequest;
+ filterChain.doFilter(servletRequest, servletResponse);
+ if (req.getRequestURI().contains("/prometheus")) {
+ registryDashboardInfo();
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ /**
+ * 更新info指标
+ */
+ private void registryDashboardInfo() {
+ Long downloadFileSuccessCount = monitorProperties.getDownloadFileSuccessCount();
+ Long downloadFileErrorCount = monitorProperties.getDownloadFileErrorCount();
+ Long postFileSuccessCount = monitorProperties.getPostFileSuccessCount();
+ Long postFileErrorCount = monitorProperties.getPostFileErrorCount();
+ Long downloadFileSize = monitorProperties.getDownloadFileSize();
+ Long postFileSize = monitorProperties.getPostFileSize();
+
+ Long fileSyncError = monitorProperties.getFileSyncError();
+ Long hosError = monitorProperties.getHosError();
+ Long ossError = monitorProperties.getOssError();
+
+ dashboardMap.put("downloadFileSuccessCount", downloadFileSuccessCount);
+ dashboardMap.put("downloadFileErrorCount",downloadFileErrorCount);
+ dashboardMap.put("postFileSuccessCount", postFileSuccessCount);
+ dashboardMap.put("postFileErrorCount", postFileErrorCount);
+ dashboardMap.put("downloadFileSize", downloadFileSize);
+ dashboardMap.put("postFileSize", postFileSize);
+
+ errorTypeMap.put("fileSyncError",fileSyncError);
+ errorTypeMap.put("hosError",hosError);
+ errorTypeMap.put("ossError",ossError);
+
+ registryMetrics(dashboardMap,errorTypeMap);
+ }
+
+ private void registryMetrics(Map map ,Map errorTypeMap) {
+ if (!ObjectUtils.isEmpty(map)) {
+ for (Map.Entry entry : map.entrySet()) {
+ //去除容器中值防止影响
+ Gauge.builder("dashInfo", map, x -> x.get(entry.getKey()))
+ .tags("id", entry.getKey(), "name", entry.getKey(), "severity", entry.getKey())
+ .description("file sync service info")
+ .register(registry);
+ }
+ }
+ if (!ObjectUtils.isEmpty(errorTypeMap)) {
+ for (Map.Entry entry : errorTypeMap.entrySet()) {
+ //去除容器中值防止影响
+ Gauge.builder("error_type_total", errorTypeMap, x -> x.get(entry.getKey()))
+ .tags("id", entry.getKey(), "name", entry.getKey(), "severity", entry.getKey())
+ .description("Number of error type")
+ .register(registry);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java b/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java
new file mode 100644
index 0000000..d81dbd0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java
@@ -0,0 +1,81 @@
+package com.zdjizhi.syncfile.monitor;
+
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MonitorProperties {
+ private static Long downloadFileSuccessCount = 0L;
+ private static Long downloadFileErrorCount = 0L;
+ private static Long postFileSuccessCount = 0L;
+ private static Long postFileErrorCount = 0L;
+ private static Long downloadFileSize = 0L;
+ private static Long postFileSize = 0L;
+
+ private static Long fileSyncError = 0L;
+ private static Long hosError = 0L;
+ private static Long ossError = 0L;
+
+ public void addDownloadFileSuccessCount() {
+ downloadFileSuccessCount = downloadFileSuccessCount + 1;
+ }
+ public void addDownloadFileErrorCount() {
+ downloadFileErrorCount = downloadFileErrorCount + 1;
+ }
+ public void addPostFileSuccessCount() {
+ postFileSuccessCount = postFileSuccessCount + 1;
+ }
+ public void addPostFileErrorCount() {
+ postFileErrorCount = postFileErrorCount + 1;
+ }
+ public void addDownloadFileSize(long fileSize) {
+ downloadFileSize = downloadFileSize + fileSize;
+ }
+ public void addPostFileSize(long fileSize) {
+ postFileSize = postFileSize + fileSize;
+ }
+ public void addFileSyncError() {
+ fileSyncError = fileSyncError + 1;
+ }
+ public void addHosError() {
+ hosError = hosError + 1;
+ }
+ public void addOssError() {
+ ossError = ossError + 1;
+ }
+
+ public Long getDownloadFileSuccessCount() {
+ return downloadFileSuccessCount;
+ }
+
+ public Long getDownloadFileErrorCount() {
+ return downloadFileErrorCount;
+ }
+
+ public Long getPostFileSuccessCount() {
+ return postFileSuccessCount;
+ }
+
+ public Long getPostFileErrorCount() {
+ return postFileErrorCount;
+ }
+
+ public Long getDownloadFileSize() {
+ return downloadFileSize;
+ }
+
+ public Long getPostFileSize() {
+ return postFileSize;
+ }
+
+ public Long getFileSyncError() {
+ return fileSyncError;
+ }
+
+ public Long getHosError() {
+ return hosError;
+ }
+
+ public Long getOssError() {
+ return ossError;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java
new file mode 100644
index 0000000..df8e6ce
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java
@@ -0,0 +1,103 @@
+package com.zdjizhi.syncfile.utils;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.syncfile.entity.PostFileResponse;
+import com.zdjizhi.syncfile.monitor.MonitorProperties;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.InputStream;
+
+@Component
+public class HttpUtil {
+ private static Log log = LogFactory.get();
+
+ @Autowired
+ private CloseableHttpClient httpClient;
+ @Autowired
+ private RequestConfig requestConfig;
+ @Autowired
+ MonitorProperties monitorProperties;
+
+ public InputStream httpGetInputStream(String url) {
+ InputStream result = null;
+ CloseableHttpResponse response = null;
+ try {
+ HttpGet httpGet = new HttpGet(url);
+ httpGet.setConfig(requestConfig);
+ response = httpClient.execute(httpGet);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ result = IOUtils.toBufferedInputStream(response.getEntity().getContent());
+ log.info("get file success. current url: {}", url);
+ monitorProperties.addDownloadFileSuccessCount();
+ monitorProperties.addDownloadFileSize(Integer.parseInt(response.getFirstHeader("Content-Length").getValue()));
+ }else if (response.getStatusLine().getStatusCode() == 500){
+ log.error("get file error. Hos service error, please check hos. current url: {}", url);
+ monitorProperties.addHosError();
+ } else {
+ log.error("get file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
+ monitorProperties.addFileSyncError();
+ }
+ } catch (Exception e) {
+ log.error("get file error. current url: {}, error: {}", url, e.toString());
+ monitorProperties.addFileSyncError();
+ } finally {
+ IoUtil.close(response);
+ }
+ return result;
+ }
+
+ public boolean httpPostInputStream(String url, InputStream data) {
+ boolean isSuccess = false;
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.setConfig(requestConfig);
+ httpPost.setEntity(new InputStreamEntity(data));
+ response = httpClient.execute(httpPost);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
+ JSONObject jsonObj = (JSONObject) JSON.parse(responseEntity);
+ if(jsonObj!=null){
+ if (responseEntity.contains("\"code\": 200")) {
+ PostFileResponse postFileResponse = JSON.toJavaObject(jsonObj, PostFileResponse.class);
+ isSuccess = true;
+ log.info("post file success. current url: {}, msg: {}", url, responseEntity);
+ monitorProperties.addPostFileSuccessCount();
+ monitorProperties.addPostFileSize(postFileResponse.getData().getFileSize());
+ } else {
+ log.error("post file error. current url: {}, msg: {}", url,responseEntity);
+ monitorProperties.addFileSyncError();
+ }
+ }else {
+ log.error("post file error, response body error. current url: {}", url);
+ monitorProperties.addOssError();
+ }
+ } else if(response.getStatusLine().getStatusCode() == 500){
+ log.error("post file error. Oss service error.current url: {}, code: 500, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
+ monitorProperties.addOssError();
+ }else {
+ log.error("post file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
+ monitorProperties.addFileSyncError();
+ }
+ } catch (Exception e) {
+ log.error("post file error. current url: " + url, e);
+ monitorProperties.addFileSyncError();
+ } finally {
+ IoUtil.close(response);
+ }
+ return isSuccess;
+ }
+}