首次提交

This commit is contained in:
houjinchuan
2021-12-01 16:19:14 +08:00
parent 45eaa5af4a
commit 5b998bd30c
17 changed files with 1253 additions and 0 deletions

19
config/application.yml Normal file
View File

@@ -0,0 +1,19 @@
nacos:
config:
type: yaml
server-addr: 192.168.44.12:8848
namespace: dev
data-id: p19-file-sync-service
auto-refresh: true
group: Galaxy
username: nacos
password: nacos
bootstrap:
enable: true
log:
enable: true
spring:
profiles:
active: dev
logging:
config: ./config/log4j2-dev.xml

56
config/log4j2-dev.xml Normal file
View File

@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--日志打印相关参数配置-->
<Properties>
<!--每5M压缩日志文件-->
<property name="LOG_SIZE">200M</property>
<!--最多产生10个压缩文件-->
<property name="LOG_NUMS">10</property>
<!--日志打印等级-->
<property name="LOG_LEVEL">info</property>
<!--日志文件路径-->
<property name="LOG_PATH">logs</property>
<!--日志文件名称-->
<property name="LOG_FILE_NAME">p19-file-sync-service</property>
<!--日志打印格式-->
<property name="LOG_PATTERN">[%d{yyyy-MM-dd HH:mm:ss}] [%p] [Thread:%t] %l %x - %m%n</property>
</Properties>
<appenders>
<Console name="consoleSystemOutAppender" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${LOG_PATTERN}"/>
</Console>
<RollingFile name="rollingFileAllAppender"
fileName="${LOG_PATH}/${LOG_FILE_NAME}.log"
filePattern="${LOG_PATH}/history/$${date:yyyy-MM-dd}/${LOG_FILE_NAME}-%d{yyyy-MM-dd}-%i.log.gz">
<PatternLayout pattern="${LOG_PATTERN}"/>
<Policies>
<SizeBasedTriggeringPolicy size="${LOG_SIZE}"/>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
</Policies>
<Filters>
<ThresholdFilter level="all" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<DefaultRolloverStrategy max="${LOG_NUMS}">
<Delete basePath="${LOG_PATH}/history" maxDepth="1">
<IfFileName glob="*.log.gz">
<IfLastModified age="90d">
<IfAny>
<IfAccumulatedFileSize exceeds="200 GB" />
</IfAny>
</IfLastModified>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
</appenders>
<loggers>
<root level="${LOG_LEVEL}">
<appender-ref ref="consoleSystemOutAppender"/>
<appender-ref ref="rollingFileAllAppender"/>
</root>
</loggers>
</configuration>

24
docker/Dockerfile Normal file
View File

@@ -0,0 +1,24 @@
ARG JDK_IMAGE
ARG GO_IMAGE
#编译环境,生成二进制文件
#FROM 192.168.40.153:9080/common/golang:1.15.6 as builder
FROM ${GO_IMAGE} as builder
WORKDIR /build
COPY xjar.go /build/
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o xjar .
FROM ${JDK_IMAGE}
MAINTAINER Galaxy
VOLUME /tmp
WORKDIR /home/tsg/galaxy/p19-file-sync-service
COPY --from=builder /build .
COPY config config
ARG JAR_FILE
COPY ${JAR_FILE} p19-file-sync-service.xjar
#dockercompose set JAVA_OPTS
ENV JAVA_OPTS=" -Xmx1g -Xms1g -Xmn128m -Xss256k -XX:MetaspaceSize=128m -XX:MaxPermSize=128m -XX:SurvivorRatio=2 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 "
ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
#ENV TZ=Asia/Almaty
#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENTRYPOINT [ "sh", "-c", "./xjar java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar p19-file-sync-service.xjar" ]

269
pom.xml Normal file
View File

@@ -0,0 +1,269 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
<artifactId>p19-file-sync-service</artifactId>
<version>21.12.01</version>
<name>p19-file-sync-service</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.7.RELEASE</version>
<relativePath/>
</parent>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public/</url>
</pluginRepository>
</pluginRepositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<nacos.config.version>0.2.8</nacos.config.version>
<docker.build>192.168.40.153</docker.build>
<docker.build.port>2375</docker.build.port>
<!--docker-registry地址-->
<docker.registry>192.168.40.153</docker.registry>
<!--docker-registry的端口-->
<docker.registry.port>9080</docker.registry.port>
<docker.image.prefix>tsg/galaxy</docker.image.prefix>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.69</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.3</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>${nacos.config.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.5.4</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.5.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.mesalab.xjar-maven-plugin</groupId>
<artifactId>mesalab-xjar-maven-plugin</artifactId>
<version>1.0.0</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
<phase>package</phase>
<configuration>
<password>Geedge2020!</password>
<excludes>
<exclude>
static/**
</exclude>
<exclude>
templates/**
</exclude>
<exclude>
resources/**
</exclude>
<exclude>
META-INF/resources/**
</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<serverId>153-docker-repo</serverId>
<registryUrl>${docker.registry}:${docker.registry.port}</registryUrl>
<!--是否向镜像registry(harbor)中推送镜像,如果为false则需要在mvn命令时添加-DpushImage参数-->
<pushImage>true</pushImage>
<!--指定镜像名称 仓库/镜像名:标签-->
<imageName>${docker.registry}:${docker.registry.port}/${docker.image.prefix}/${project.artifactId}
</imageName>
<!--覆盖相同标签镜像-->
<forceTags>true</forceTags>
<imageTags>
<imageTag>${project.version}</imageTag>
</imageTags>
<!--指定仓库地址 远程docker构建供dockerfile使用-->
<dockerHost>http://192.168.40.153:2375</dockerHost>
<!--Dockerfile文件所在目录-->
<dockerDirectory>docker</dockerDirectory>
<buildArgs>
<JDK_IMAGE>192.168.40.153:9080/common/jdk:1.8.0_73-jre</JDK_IMAGE>
<GO_IMAGE>192.168.40.153:9080/common/golang:1.15.6</GO_IMAGE>
<JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
</buildArgs>
<!--将构建jar拷贝到/target/docker 目录下与dockerfile一起-->
<resources>
<resource>
<!-- 指定要复制的目录路径,这里是当前目录 -->
<targetPath>/</targetPath>
<!-- 指定要复制的根目录这里是target目录 -->
<directory>${project.build.directory}</directory>
<!-- 指定需要拷贝的文件这里指最后生成的jar包 -->
<include>${project.build.finalName}.xjar</include>
<include>xjar.go</include>
</resource>
<resource>
<targetPath>/config</targetPath>
<directory>config</directory>
</resource>
</resources>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,19 @@
package com.zdjizhi.syncfile;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class P19FileSyncServiceApplication {
public static void main(String[] args) {
SpringApplication.run(P19FileSyncServiceApplication.class, args);
}
@Bean
MeterRegistryCustomizer<MeterRegistry> configurer(@Value("${spring.application.name}") String applicationName){
return registry -> registry.config().commonTags("application", applicationName);
}
}

View File

@@ -0,0 +1,125 @@
package com.zdjizhi.syncfile.config;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
//@ConfigurationProperties(prefix = "http")
@NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
public class HttpClientPool {
private Integer maxTotal;
private Integer defaultMaxPerRoute;
private Integer connectTimeout;
private Integer connectionRequestTimeout;
private Integer socketTimeout;
private boolean staleConnectionCheckEnabled;
public void setMaxTotal(Integer maxTotal) {
this.maxTotal = maxTotal;
}
public void setDefaultMaxPerRoute(Integer defaultMaxPerRoute) {
this.defaultMaxPerRoute = defaultMaxPerRoute;
}
public void setConnectTimeout(Integer connectTimeout) {
this.connectTimeout = connectTimeout;
}
public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {
this.connectionRequestTimeout = connectionRequestTimeout;
}
public void setSocketTimeout(Integer socketTimeout) {
this.socketTimeout = socketTimeout;
}
public void setStaleConnectionCheckEnabled(boolean staleConnectionCheckEnabled) {
this.staleConnectionCheckEnabled = staleConnectionCheckEnabled;
}
/**
* 首先实例化一个连接池管理器,设置最大连接数、并发连接数
*
* @return
*/
@Bean(name = "httpClientConnectionManager")
public PoolingHttpClientConnectionManager getHttpClientConnectionManager() {
PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager();
//最大连接数
httpClientConnectionManager.setMaxTotal(maxTotal);
//并发数
httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
return httpClientConnectionManager;
}
/**
* 实例化连接池,设置连接池管理器。
* 这里需要以参数形式注入上面实例化的连接池管理器
*
* @param httpClientConnectionManager
* @return
*/
@Bean(name = "httpClientBuilder")
public HttpClientBuilder getHttpClientBuilder(@Qualifier("httpClientConnectionManager") PoolingHttpClientConnectionManager httpClientConnectionManager) {
//HttpClientBuilder中的构造方法被protected修饰所以这里不能直接使用new来实例化一个HttpClientBuilder可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setConnectionManager(httpClientConnectionManager);
return httpClientBuilder;
}
/**
* 注入连接池用于获取httpClient
*
* @param httpClientBuilder
* @return
*/
@Bean
public CloseableHttpClient getCloseableHttpClient(@Qualifier("httpClientBuilder") HttpClientBuilder httpClientBuilder) {
return httpClientBuilder.build();
}
/**
* Builder是RequestConfig的一个内部类
* 通过RequestConfig的custom方法来获取到一个Builder对象
* 设置builder的连接信息
* 这里还可以设置proxycookieSpec等属性。有需要的话可以在此设置
*
* @return
*/
@Bean(name = "builder")
public RequestConfig.Builder getBuilder() {
RequestConfig.Builder builder = RequestConfig.custom();
return builder.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout)
.setStaleConnectionCheckEnabled(staleConnectionCheckEnabled);
}
/**
* 使用builder构建一个RequestConfig对象
*/
@Bean(name = "requestConfig")
public RequestConfig getRequestConfig(@Qualifier("builder") RequestConfig.Builder builder) {
return builder.build();
}
}

View File

@@ -0,0 +1,126 @@
package com.zdjizhi.syncfile.config;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import com.zdjizhi.syncfile.consumer.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
//@ConfigurationProperties(prefix = "kafka.consumer")
@NacosConfigurationProperties(prefix = "kafka.consumer", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
@EnableKafka
public class KafkaConsumerConfig {
private String servers;
private boolean enable_auto_commit;
private String session_timeout;
private String auto_commit_interval;
private String group_id;
private String auto_offset_reset;
private int poll_timeout;
private int concurrency;
private boolean batch_listener;
private int pool_record;
private String sasl_username;
private String sasl_password;
public void setServers(String servers) {
this.servers = servers;
}
public void setEnable_auto_commit(boolean enable_auto_commit) {
this.enable_auto_commit = enable_auto_commit;
}
public void setSession_timeout(String session_timeout) {
this.session_timeout = session_timeout;
}
public void setAuto_commit_interval(String auto_commit_interval) {
this.auto_commit_interval = auto_commit_interval;
}
public void setGroup_id(String group_id) {
this.group_id = group_id;
}
public void setAuto_offset_reset(String auto_offset_reset) {
this.auto_offset_reset = auto_offset_reset;
}
public void setPoll_timeout(int poll_timeout) {
this.poll_timeout = poll_timeout;
}
public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}
public void setBatch_listener(boolean batch_listener) {
this.batch_listener = batch_listener;
}
public void setPool_record(int pool_record) {
this.pool_record = pool_record;
}
public void setSasl_username(String sasl_username) {
this.sasl_username = sasl_username;
}
public void setSasl_password(String sasl_password) {
this.sasl_password = sasl_password;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(batch_listener);//设置为批量消费
factory.getContainerProperties().setPollTimeout(poll_timeout);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool_record);//每一批数量
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, session_timeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
propsMap.put("security.protocol", "SASL_PLAINTEXT");
propsMap.put("sasl.mechanism", "PLAIN");
propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";");
return propsMap;
}
@Bean
public KafkaConsumerListener listener() {
return new KafkaConsumerListener();
}
}

View File

@@ -0,0 +1,31 @@
package com.zdjizhi.syncfile.config;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Configuration
//@ConfigurationProperties(prefix = "thread")
@NacosConfigurationProperties(prefix = "thread", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
public class ThreadPoolFactory {
private Integer maxSize;
public void setMaxSize(Integer maxSize) {
this.maxSize = maxSize;
}
public Integer getMaxSize() {
return maxSize;
}
@Bean(name = "threadPool")
public ExecutorService getThreadPool() {
return Executors.newFixedThreadPool(maxSize);
}
}

View File

@@ -0,0 +1,59 @@
package com.zdjizhi.syncfile.consumer;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.syncfile.core.SyncFiles;
import com.zdjizhi.syncfile.entity.Source;
import com.zdjizhi.syncfile.entity.SysFileSync;
import com.zdjizhi.syncfile.monitor.MonitorProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class KafkaConsumerListener {
private Log log = LogFactory.get();
@Autowired
SyncFiles syncFiles;
@Autowired
MonitorProperties monitorProperties;
@KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
try {
List<List<Source>> fileList = new ArrayList<>();
for (ConsumerRecord<?, ?> record : records) {
log.info("消费kafka的数据的value: " + record.value().toString());
JSONObject jsonObj = (JSONObject) JSON.parse(record.value().toString());
SysFileSync sysFileSync = JSON.toJavaObject(jsonObj, SysFileSync.class);
if (sysFileSync != null) {
List<Source> sourceList = sysFileSync.getSourceList();
if(sourceList.size() < 1){
log.error("kafka data error, sourceList is null. kafka data: "+record.value().toString());
monitorProperties.addFileSyncError();
}else {
fileList.add(sourceList);
}
}else {
log.error("parse kafka data error. kafka data: "+record.value().toString());
monitorProperties.addFileSyncError();
}
}
syncFiles.syncFiles(fileList);
} catch (Exception e) {
log.error("consume kafka data error.", e);
monitorProperties.addFileSyncError();
} finally {
ack.acknowledge();
}
}
}

View File

@@ -0,0 +1,89 @@
package com.zdjizhi.syncfile.core;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.syncfile.config.ThreadPoolFactory;
import com.zdjizhi.syncfile.entity.Source;
import com.zdjizhi.syncfile.monitor.MonitorProperties;
import com.zdjizhi.syncfile.utils.HttpUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@Component
public class SyncFiles {
private Log log = LogFactory.get();
@Autowired
private HttpUtil httpUtil;
@Autowired
private ExecutorService threadPool;
@Autowired
private ThreadPoolFactory threadPoolFactory;
@Autowired
MonitorProperties monitorProperties;
public void syncFiles(List<List<Source>> fileList) {
List<Callable<Boolean>> callableList = new ArrayList<>();
try {
for (List<Source> sourceList : fileList) {
callableList.add(() -> {
boolean status = false;
InputStream content = null;
try {
for (Source source : sourceList) {
String source_oss_path = source.getSource_oss_path();
String destination_oss_path = source.getDestination_oss_path();
if (source_oss_path != null && !"".equals(source_oss_path)
&& destination_oss_path != null && !"".equals(destination_oss_path)) {
content = httpUtil.httpGetInputStream(source_oss_path);
if (content != null) {
boolean isSuccess = httpUtil.httpPostInputStream(destination_oss_path, content);
if (!isSuccess) {
log.error("Sync file failed, post oss file error. destination_oss_path: {}", destination_oss_path);
monitorProperties.addPostFileErrorCount();
return false;
} else {
status = true;
}
} else {
log.error("Sync file failed, get hos file error. source_oss_path: {}", source_oss_path);
monitorProperties.addDownloadFileErrorCount();
return false;
}
} else {
log.error("Sync file failed, source_oss_path or destination_oss_path is incorrect. source_oss_path: {} ,destination_oss_path: {}", source_oss_path, destination_oss_path);
monitorProperties.addFileSyncError();
return false;
}
}
} catch (Exception e) {
log.error("Sync file failed.", e);
monitorProperties.addFileSyncError();
status = false;
} finally {
IoUtil.close(content);
}
return status;
});
if (callableList.size() == threadPoolFactory.getMaxSize()) {
threadPool.invokeAll(callableList);
callableList.clear();
}
}
if (callableList.size() > 0) {
threadPool.invokeAll(callableList);
callableList.clear();
}
} catch (InterruptedException e) {
log.error("Sync files failed.", e);
monitorProperties.addFileSyncError();
}
}
}

View File

@@ -0,0 +1,40 @@
package com.zdjizhi.syncfile.entity;
public class Data {
private int fileSize;
private String sign;
private String state;
public int getFileSize() {
return fileSize;
}
public void setFileSize(int fileSize) {
this.fileSize = fileSize;
}
public String getSign() {
return sign;
}
public void setSign(String sign) {
this.sign = sign;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
@Override
public String toString() {
return "Data{" +
"fileSize=" + fileSize +
", sign='" + sign + '\'' +
", state='" + state + '\'' +
'}';
}
}

View File

@@ -0,0 +1,43 @@
package com.zdjizhi.syncfile.entity;
public class PostFileResponse {
private int code;
private Data data;
private String message;
public PostFileResponse() {
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public Data getData() {
return data;
}
public void setData(Data data) {
this.data = data;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "PostFileResponse{" +
"code=" + code +
", data=" + data +
", message='" + message + '\'' +
'}';
}
}

View File

@@ -0,0 +1,23 @@
package com.zdjizhi.syncfile.entity;
public class Source {
private String source_oss_path;
private String destination_oss_path;
public String getSource_oss_path() {
return source_oss_path;
}
public void setSource_oss_path(String source_oss_path) {
this.source_oss_path = source_oss_path;
}
public String getDestination_oss_path() {
return destination_oss_path;
}
public void setDestination_oss_path(String destination_oss_path) {
this.destination_oss_path = destination_oss_path;
}
}

View File

@@ -0,0 +1,51 @@
package com.zdjizhi.syncfile.entity;
import java.util.List;
public class SysFileSync {
private List<Source> sourceList;
private long common_log_id;
private long common_recv_time;
private String common_schema_type;
private long processing_time;
public List<Source> getSourceList() {
return sourceList;
}
public void setSourceList(List<Source> sourceList) {
this.sourceList = sourceList;
}
public long getCommon_log_id() {
return common_log_id;
}
public void setCommon_log_id(long common_log_id) {
this.common_log_id = common_log_id;
}
public String getCommon_schema_type() {
return common_schema_type;
}
public void setCommon_schema_type(String common_schema_type) {
this.common_schema_type = common_schema_type;
}
public long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public long getProcessing_time() {
return processing_time;
}
public void setProcessing_time(long processing_time) {
this.processing_time = processing_time;
}
}

View File

@@ -0,0 +1,95 @@
package com.zdjizhi.syncfile.monitor;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Component
public class LogChartMetricsFilter implements Filter {
@Autowired
private MonitorProperties monitorProperties;
private MeterRegistry registry;
private Map<String, Long> dashboardMap = new HashMap<>();
private Map<String, Long> errorTypeMap = new HashMap<>();
public LogChartMetricsFilter(MeterRegistry registry) {
this.registry = registry;
}
@Override
public void init(FilterConfig filterConfig) {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) servletRequest;
filterChain.doFilter(servletRequest, servletResponse);
if (req.getRequestURI().contains("/prometheus")) {
registryDashboardInfo();
}
}
@Override
public void destroy() {
}
/**
* 更新info指标
*/
private void registryDashboardInfo() {
Long downloadFileSuccessCount = monitorProperties.getDownloadFileSuccessCount();
Long downloadFileErrorCount = monitorProperties.getDownloadFileErrorCount();
Long postFileSuccessCount = monitorProperties.getPostFileSuccessCount();
Long postFileErrorCount = monitorProperties.getPostFileErrorCount();
Long downloadFileSize = monitorProperties.getDownloadFileSize();
Long postFileSize = monitorProperties.getPostFileSize();
Long fileSyncError = monitorProperties.getFileSyncError();
Long hosError = monitorProperties.getHosError();
Long ossError = monitorProperties.getOssError();
dashboardMap.put("downloadFileSuccessCount", downloadFileSuccessCount);
dashboardMap.put("downloadFileErrorCount",downloadFileErrorCount);
dashboardMap.put("postFileSuccessCount", postFileSuccessCount);
dashboardMap.put("postFileErrorCount", postFileErrorCount);
dashboardMap.put("downloadFileSize", downloadFileSize);
dashboardMap.put("postFileSize", postFileSize);
errorTypeMap.put("fileSyncError",fileSyncError);
errorTypeMap.put("hosError",hosError);
errorTypeMap.put("ossError",ossError);
registryMetrics(dashboardMap,errorTypeMap);
}
private void registryMetrics(Map<String, Long> map ,Map<String, Long> errorTypeMap) {
if (!ObjectUtils.isEmpty(map)) {
for (Map.Entry<String, Long> entry : map.entrySet()) {
//去除容器中值防止影响
Gauge.builder("dashInfo", map, x -> x.get(entry.getKey()))
.tags("id", entry.getKey(), "name", entry.getKey(), "severity", entry.getKey())
.description("file sync service info")
.register(registry);
}
}
if (!ObjectUtils.isEmpty(errorTypeMap)) {
for (Map.Entry<String, Long> entry : errorTypeMap.entrySet()) {
//去除容器中值防止影响
Gauge.builder("error_type_total", errorTypeMap, x -> x.get(entry.getKey()))
.tags("id", entry.getKey(), "name", entry.getKey(), "severity", entry.getKey())
.description("Number of error type")
.register(registry);
}
}
}
}

View File

@@ -0,0 +1,81 @@
package com.zdjizhi.syncfile.monitor;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MonitorProperties {
private static Long downloadFileSuccessCount = 0L;
private static Long downloadFileErrorCount = 0L;
private static Long postFileSuccessCount = 0L;
private static Long postFileErrorCount = 0L;
private static Long downloadFileSize = 0L;
private static Long postFileSize = 0L;
private static Long fileSyncError = 0L;
private static Long hosError = 0L;
private static Long ossError = 0L;
public void addDownloadFileSuccessCount() {
downloadFileSuccessCount = downloadFileSuccessCount + 1;
}
public void addDownloadFileErrorCount() {
downloadFileErrorCount = downloadFileErrorCount + 1;
}
public void addPostFileSuccessCount() {
postFileSuccessCount = postFileSuccessCount + 1;
}
public void addPostFileErrorCount() {
postFileErrorCount = postFileErrorCount + 1;
}
public void addDownloadFileSize(long fileSize) {
downloadFileSize = downloadFileSize + fileSize;
}
public void addPostFileSize(long fileSize) {
postFileSize = postFileSize + fileSize;
}
public void addFileSyncError() {
fileSyncError = fileSyncError + 1;
}
public void addHosError() {
hosError = hosError + 1;
}
public void addOssError() {
ossError = ossError + 1;
}
public Long getDownloadFileSuccessCount() {
return downloadFileSuccessCount;
}
public Long getDownloadFileErrorCount() {
return downloadFileErrorCount;
}
public Long getPostFileSuccessCount() {
return postFileSuccessCount;
}
public Long getPostFileErrorCount() {
return postFileErrorCount;
}
public Long getDownloadFileSize() {
return downloadFileSize;
}
public Long getPostFileSize() {
return postFileSize;
}
public Long getFileSyncError() {
return fileSyncError;
}
public Long getHosError() {
return hosError;
}
public Long getOssError() {
return ossError;
}
}

View File

@@ -0,0 +1,103 @@
package com.zdjizhi.syncfile.utils;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.syncfile.entity.PostFileResponse;
import com.zdjizhi.syncfile.monitor.MonitorProperties;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.InputStream;
@Component
public class HttpUtil {
private static Log log = LogFactory.get();
@Autowired
private CloseableHttpClient httpClient;
@Autowired
private RequestConfig requestConfig;
@Autowired
MonitorProperties monitorProperties;
public InputStream httpGetInputStream(String url) {
InputStream result = null;
CloseableHttpResponse response = null;
try {
HttpGet httpGet = new HttpGet(url);
httpGet.setConfig(requestConfig);
response = httpClient.execute(httpGet);
if (response.getStatusLine().getStatusCode() == 200) {
result = IOUtils.toBufferedInputStream(response.getEntity().getContent());
log.info("get file success. current url: {}", url);
monitorProperties.addDownloadFileSuccessCount();
monitorProperties.addDownloadFileSize(Integer.parseInt(response.getFirstHeader("Content-Length").getValue()));
}else if (response.getStatusLine().getStatusCode() == 500){
log.error("get file error. Hos service error, please check hos. current url: {}", url);
monitorProperties.addHosError();
} else {
log.error("get file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
monitorProperties.addFileSyncError();
}
} catch (Exception e) {
log.error("get file error. current url: {}, error: {}", url, e.toString());
monitorProperties.addFileSyncError();
} finally {
IoUtil.close(response);
}
return result;
}
public boolean httpPostInputStream(String url, InputStream data) {
boolean isSuccess = false;
CloseableHttpResponse response = null;
try {
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(requestConfig);
httpPost.setEntity(new InputStreamEntity(data));
response = httpClient.execute(httpPost);
if (response.getStatusLine().getStatusCode() == 200) {
String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
JSONObject jsonObj = (JSONObject) JSON.parse(responseEntity);
if(jsonObj!=null){
if (responseEntity.contains("\"code\": 200")) {
PostFileResponse postFileResponse = JSON.toJavaObject(jsonObj, PostFileResponse.class);
isSuccess = true;
log.info("post file success. current url: {}, msg: {}", url, responseEntity);
monitorProperties.addPostFileSuccessCount();
monitorProperties.addPostFileSize(postFileResponse.getData().getFileSize());
} else {
log.error("post file error. current url: {}, msg: {}", url,responseEntity);
monitorProperties.addFileSyncError();
}
}else {
log.error("post file error, response body error. current url: {}", url);
monitorProperties.addOssError();
}
} else if(response.getStatusLine().getStatusCode() == 500){
log.error("post file error. Oss service error.current url: {}, code: 500, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
monitorProperties.addOssError();
}else {
log.error("post file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
monitorProperties.addFileSyncError();
}
} catch (Exception e) {
log.error("post file error. current url: " + url, e);
monitorProperties.addFileSyncError();
} finally {
IoUtil.close(response);
}
return isSuccess;
}
}