1. 添加AuditAdvice类,用以向审计接口持续发送用户操作数据
2. 添加任务结束状态Scheduled方法,用以周期性扫库将任务修改为已结束状态
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,6 +1,7 @@
|
||||
HELP.md
|
||||
.gradle
|
||||
build/
|
||||
./log/
|
||||
!gradle/wrapper/gradle-wrapper.jar
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
@@ -25,6 +25,7 @@ dependencies {
|
||||
// SpringBoot原生依赖
|
||||
implementation 'org.springframework.boot:spring-boot-starter-web'
|
||||
implementation 'org.springframework.boot:spring-boot-starter-validation'
|
||||
implementation 'org.springframework.boot:spring-boot-starter-webflux'
|
||||
implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.3'
|
||||
implementation 'org.springframework.boot:spring-boot-starter-actuator'
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.realtime.protection;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ProtectionApplication {
|
||||
|
||||
@@ -56,7 +56,7 @@ public class ProtectObject {
|
||||
|
||||
@JsonProperty("proobj_audit_status")
|
||||
@ExcelIgnore
|
||||
@Schema(description = "防护对象审核状态(0为未审核,1为已退回,2为审核通过)", example = "2")
|
||||
@Schema(description = "防护对象审核状态(0为未审核,1为已退回,2为审核通过)", accessMode = Schema.AccessMode.READ_ONLY)
|
||||
private Integer protectObjectAuditStatus;
|
||||
|
||||
@JsonProperty("proobj_create_username")
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
package com.realtime.protection.configuration.entity.user;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class User {
|
||||
private int userID;
|
||||
private Long userId;
|
||||
|
||||
private Long deptId;
|
||||
|
||||
private String username;
|
||||
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
package com.realtime.protection.configuration.response;
|
||||
|
||||
import com.realtime.protection.ProtectionApplication;
|
||||
import com.realtime.protection.configuration.entity.user.User;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.converter.HttpMessageConverter;
|
||||
import org.springframework.http.server.ServerHttpRequest;
|
||||
import org.springframework.http.server.ServerHttpResponse;
|
||||
import org.springframework.web.bind.annotation.RestControllerAdvice;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import org.springframework.web.reactive.function.client.WebClientRequestException;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@RestControllerAdvice(basePackageClasses = {ProtectionApplication.class})
|
||||
@Slf4j
|
||||
public class AuditAdvice implements ResponseBodyAdvice<ResponseResult> {
|
||||
|
||||
private final WebClient webClient = WebClient
|
||||
.builder()
|
||||
.baseUrl("http://39.105.210.156:8090/chanct-log/audit-xgs")
|
||||
.build();
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class AuditData {
|
||||
private Long userId;
|
||||
private Long deptId;
|
||||
private String userName;
|
||||
private String deptName;
|
||||
private String menu;
|
||||
private String action;
|
||||
private String res;
|
||||
private String content;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supports(MethodParameter returnType, Class<? extends HttpMessageConverter<?>> converterType) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResponseResult beforeBodyWrite(ResponseResult body, MethodParameter returnType, MediaType selectedContentType, Class<? extends HttpMessageConverter<?>> selectedConverterType, ServerHttpRequest request, ServerHttpResponse response) {
|
||||
User user = getUserData(request);
|
||||
|
||||
AuditData auditData = new AuditData(
|
||||
user.getUserId(), user.getDeptId(), user.getUsername(), user.getUserDepart(),
|
||||
"",
|
||||
Objects.requireNonNull(returnType.getMethod()).getName(),
|
||||
success(body),
|
||||
body.getMessage()
|
||||
);
|
||||
|
||||
|
||||
Mono<String> mono = webClient
|
||||
.post()
|
||||
.uri("/save")
|
||||
.bodyValue(auditData)
|
||||
.exchangeToMono(res -> {
|
||||
if (res.statusCode().equals(HttpStatus.OK)) {
|
||||
return res.bodyToMono(String.class);
|
||||
}
|
||||
|
||||
return null;
|
||||
})
|
||||
.doOnError(WebClientRequestException.class, err ->
|
||||
log.warn("审计服务器遭遇异常" + err.getMessage()));
|
||||
|
||||
mono.subscribe(AuditAdvice::handleMono);
|
||||
|
||||
return body;
|
||||
}
|
||||
|
||||
private User getUserData(ServerHttpRequest request) {
|
||||
return new User(1L, 1L, "xxx", "", "xxx");
|
||||
}
|
||||
|
||||
private String success(ResponseResult body) {
|
||||
return Boolean.toString(body.getCode() == 200);
|
||||
}
|
||||
|
||||
private static void handleMono(String result) {
|
||||
log.debug("审计服务器返回结果:" + result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,5 +18,9 @@ public interface CommandMapper {
|
||||
|
||||
Boolean startCommandsByTaskId(@Param("task_id") Long taskId);
|
||||
|
||||
Boolean setCommandValid(@Param("command_id") String commandId);
|
||||
|
||||
Boolean setCommandInvalid(@Param("command_id") String commandId);
|
||||
|
||||
List<TaskCommandInfo> queryCommandInfoByTaskId(@Param("task_id") Long taskId);
|
||||
}
|
||||
|
||||
@@ -50,7 +50,6 @@ public class CommandService {
|
||||
};
|
||||
|
||||
sqlSessionWrapper.startBatchSession(CommandMapper.class, function, taskCommandInfos);
|
||||
|
||||
}
|
||||
|
||||
public List<TaskCommandInfo> queryCommandInfoByTaskId(Long taskId) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.realtime.protection.configuration.entity.task.Task;
|
||||
import com.realtime.protection.configuration.entity.task.TaskCommandInfo;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -46,4 +47,7 @@ public interface TaskMapper {
|
||||
List<Integer> queryDynamicRuleIdsFromTaskId(@Param("task_id") Long taskId);
|
||||
|
||||
List<Integer> queryStaticRuleIdsFromTaskId(@Param("task_id") Long taskId);
|
||||
|
||||
@Select("SELECT task_id FROM t_task WHERE task_end_time < NOW() AND task_status != #{task_status}")
|
||||
List<Long> queryTasksByStatus(@Param("task_status") Integer taskStatus);
|
||||
}
|
||||
|
||||
@@ -3,13 +3,16 @@ package com.realtime.protection.server.task;
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.realtime.protection.configuration.entity.task.Task;
|
||||
import com.realtime.protection.configuration.entity.task.TaskCommandInfo;
|
||||
import com.realtime.protection.configuration.utils.enums.StateEnum;
|
||||
import com.realtime.protection.configuration.utils.status.AuditStatusValidator;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@DS("mysql")
|
||||
public class TaskService {
|
||||
private final TaskMapper taskMapper;
|
||||
@@ -119,4 +122,8 @@ public class TaskService {
|
||||
taskMapper.newTaskUsingCommandInfo(taskCommandInfo);
|
||||
return taskCommandInfo.getTaskId();
|
||||
}
|
||||
|
||||
public List<Long> getFinishedTasks() {
|
||||
return taskMapper.queryTasksByStatus(StateEnum.FINISHED.getStateNum());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,11 +7,17 @@ import com.realtime.protection.configuration.utils.status.State;
|
||||
import com.realtime.protection.server.command.CommandService;
|
||||
import com.realtime.protection.server.task.TaskService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@Service
|
||||
@EnableScheduling
|
||||
@Slf4j
|
||||
public class StateChangeService {
|
||||
private final CommandService commandService;
|
||||
@@ -73,4 +79,22 @@ public class StateChangeService {
|
||||
// 我们需要保证只有任务创建函数才能将GENERATING状态转换为RUNNING状态
|
||||
return !Objects.equals(originalState, StateEnum.GENERATING.getState());
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 0/2 * * * ?")
|
||||
@Transactional
|
||||
@Async
|
||||
protected void finishTasks() {
|
||||
List<Long> finishedTaskIds = taskService.getFinishedTasks();
|
||||
log.debug("成功扫描出所有需要变为结束状态的任务:" + finishedTaskIds);
|
||||
|
||||
for (Long taskId : finishedTaskIds) {
|
||||
try {
|
||||
changeState(StateEnum.FINISHED.getStateNum(), taskId, true);
|
||||
} catch (Exception e) {
|
||||
log.warn(String.format("任务%d从%s状态变为FINISHED状态遭遇异常:%s",
|
||||
taskId, taskService.queryTaskStatus(taskId), e.getMessage()));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,15 +62,19 @@ public class StateHandler {
|
||||
return true;
|
||||
}
|
||||
|
||||
// todo: 如果是实时任务或者研判后处置任务,那么就需要在任务启动之后,立刻向动态规则中指定的系统发送日志筛选请求。
|
||||
// 筛选完成后,系统返回日志,需要由接收端点提取字段,并且合成一条静态规则,再按照任务开始时间、结束时间和任务类型进行指令创建
|
||||
private Boolean handleJudgedTaskStart(CommandService commandService, TaskService taskService, Long taskId) {
|
||||
// todo: 研判后处置任务的指令的is_valid字段一开始需要设置为false
|
||||
return true;
|
||||
}
|
||||
|
||||
private Boolean handleDynamicTaskStart(CommandService commandService, TaskService taskService, Long taskId) {
|
||||
// todo: 实时任务的指令的is_valid字段一开始需要设置为true
|
||||
return true;
|
||||
}
|
||||
|
||||
private Boolean handleStaticTaskStart(CommandService commandService, TaskService taskService, Long taskId) throws DorisStartException {
|
||||
private Boolean handleStaticTaskStart(CommandService commandService, TaskService taskService, Long taskId) {
|
||||
// 如果未能获取staticTaskCommandInfos,需要报错
|
||||
List<TaskCommandInfo> staticTaskCommandInfos = taskService.getStaticCommandInfos(taskId);
|
||||
if (staticTaskCommandInfos == null || staticTaskCommandInfos.isEmpty()) {
|
||||
|
||||
@@ -13,6 +13,7 @@ public class FailedState extends StateHandler implements State {
|
||||
return switch (StateEnum.getStateEnumByState(newState)) {
|
||||
case RUNNING -> handleStart(taskService, commandService, taskId);
|
||||
case STOP -> handleStop(commandService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ public class GeneratingState extends StateHandler implements State {
|
||||
return switch (StateEnum.getStateEnumByState(newState)) {
|
||||
case RUNNING -> true;
|
||||
case FAILED -> handleFailed(commandService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ public class PendingState extends StateHandler implements State {
|
||||
return switch (StateEnum.getStateEnumByState(newState)) {
|
||||
case FAILED -> handleFailed(commandService, taskId);
|
||||
case RUNNING -> handleStart(taskService, commandService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ public class StopState extends StateHandler implements State {
|
||||
return switch (StateEnum.getStateEnumByState(newState)) {
|
||||
case RUNNING -> handleStart(taskService, commandService, taskId);
|
||||
case FAILED -> handleFailed(commandService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ logging:
|
||||
level:
|
||||
com.realtime.protection: debug
|
||||
file:
|
||||
name: classpath:log/realtime_protection.log
|
||||
name: log/realtime_protection.log
|
||||
|
||||
spring:
|
||||
datasource:
|
||||
|
||||
@@ -3,9 +3,9 @@ server:
|
||||
|
||||
logging:
|
||||
level:
|
||||
com.realtime.protection: warning
|
||||
com.realtime.protection: warn
|
||||
file:
|
||||
name: classpath:log/realtime_protection.log
|
||||
name: log/realtime_protection.log
|
||||
|
||||
spring:
|
||||
datasource:
|
||||
|
||||
@@ -5,7 +5,7 @@ logging:
|
||||
level:
|
||||
com.realtime.protection: info
|
||||
file:
|
||||
name: classpath:log/realtime_protection.log
|
||||
name: log/realtime_protection.log
|
||||
|
||||
spring:
|
||||
datasource:
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
spring:
|
||||
config:
|
||||
import: classpath:config/application-test.yml
|
||||
profiles:
|
||||
active: test
|
||||
@@ -99,4 +99,20 @@
|
||||
WHERE TASK_ID = #{task_id}
|
||||
AND IS_DELETED = FALSE
|
||||
</update>
|
||||
|
||||
<update id="setCommandValid">
|
||||
UPDATE t_command
|
||||
SET IS_VALID = TRUE,
|
||||
LAST_UPDATE = NOW()
|
||||
WHERE COMMAND_ID = #{command_id}
|
||||
AND IS_DELETED = FALSE
|
||||
</update>
|
||||
|
||||
<update id="setCommandInvalid">
|
||||
UPDATE t_command
|
||||
SET IS_VALID = FALSE,
|
||||
LAST_UPDATE = NOW()
|
||||
WHERE COMMAND_ID = #{command_id}
|
||||
AND IS_DELETED = FALSE
|
||||
</update>
|
||||
</mapper>
|
||||
|
||||
Reference in New Issue
Block a user