1、动态任务增加发送指令下发信号
2、动态规则批量删除增加使用中判断 3、静态规则返回字段为‘’自动处理为null 4、修复防护对象查询返回ip没有转为ip格式错误
This commit is contained in:
@@ -7,6 +7,7 @@ import com.realtime.protection.configuration.entity.task.TaskCommandInfo;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
import org.apache.ibatis.annotations.Update;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
@@ -114,4 +115,11 @@ public interface TaskMapper {
|
||||
void updateTaskStatusLogExpireTimeBatch(List<Integer> taskIds);
|
||||
|
||||
List<Task> queryHistory(Long id, Integer page, Integer pageSize);
|
||||
|
||||
@Select("SELECT task_id FROM t_task WHERE task_start_time >= NOW() " +
|
||||
"AND task_status = #{stateNum} AND task_audit_status = #{AuditNum}")
|
||||
List<Long> queryRunnableTasks(Integer stateNum, Integer AuditNum);
|
||||
|
||||
@Update("UPDATE t_task SET task_start_time = NOW() WHERE task_id = #{taskId}")
|
||||
void updateTaskStartTime(Long taskId);
|
||||
}
|
||||
|
||||
@@ -577,4 +577,12 @@ public class TaskService {
|
||||
dynamicRuleMapper.insertStatusLogBatch(ids);
|
||||
|
||||
}
|
||||
|
||||
public List<Long> getRunnableTasks() {
|
||||
return taskMapper.queryRunnableTasks(StateEnum.PENDING.getStateNum(),AuditStatusEnum.AUDITED.getNum());
|
||||
}
|
||||
|
||||
public void updateTaskStartTime(Long taskId) {
|
||||
taskMapper.updateTaskStartTime(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ public class StateChangeService {
|
||||
return false;
|
||||
}
|
||||
|
||||
log.debug(String.format("成功使得task(%d)从%s切换为%s",
|
||||
log.info(String.format("成功使得task(%d)从%s切换为%s",
|
||||
taskId,
|
||||
originalState.getClass().getSimpleName(),
|
||||
newState.getClass().getSimpleName()));
|
||||
@@ -118,24 +118,24 @@ public class StateChangeService {
|
||||
}
|
||||
}
|
||||
|
||||
// /**
|
||||
// * 将任务切换为开始状态
|
||||
// */
|
||||
// @Scheduled(cron = "0/10 * * * * ?")
|
||||
// @Async
|
||||
// protected void startTasks() {
|
||||
// List<Long> startedTaskIds = taskService.getFinishedTasks();
|
||||
// log.debug("成功扫描出所有需要变为开始状态的任务:{}", startedTaskIds);
|
||||
//
|
||||
// for (Long taskId : startedTaskIds) {
|
||||
// try {
|
||||
// changeState(StateEnum.RUNNING.getStateNum(), taskId, true);
|
||||
// } catch (Exception e) {
|
||||
// log.warn(String.format("任务%d从%s状态变为运行中RUNNING状态遭遇异常:%s",
|
||||
// taskId, taskService.queryTaskStatus(taskId), e.getMessage()));
|
||||
// }
|
||||
//
|
||||
// }
|
||||
// }
|
||||
/**
|
||||
* 将任务切换为开始状态
|
||||
*/
|
||||
@Scheduled(cron = "0/10 * * * * ?")
|
||||
@Async
|
||||
protected void startTasks() {
|
||||
List<Long> runnableTaskIds = taskService.getRunnableTasks();
|
||||
log.debug("成功扫描出所有需要变为开始状态的任务:{}", runnableTaskIds);
|
||||
|
||||
for (Long taskId : runnableTaskIds) {
|
||||
try {
|
||||
changeState(StateEnum.RUNNING.getStateNum(), taskId, true);
|
||||
} catch (Exception e) {
|
||||
log.warn(String.format("任务%d从%s状态变为运行中RUNNING状态遭遇异常:%s",
|
||||
taskId, taskService.queryTaskStatus(taskId), e.getMessage()));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,17 +10,21 @@ import com.realtime.protection.server.command.CommandService;
|
||||
import com.realtime.protection.server.task.TaskService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.client.*;
|
||||
import reactor.core.publisher.Mono;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class StateHandler {
|
||||
|
||||
|
||||
@@ -38,6 +42,7 @@ public class StateHandler {
|
||||
protected Boolean handleStart(TaskService taskService, CommandService commandService, Long taskId) {
|
||||
Task task = taskService.queryTask(taskId);
|
||||
|
||||
|
||||
if (task == null) {
|
||||
throw new IllegalArgumentException("无效task_id,因为无法找到对应任务");
|
||||
}
|
||||
@@ -52,6 +57,16 @@ public class StateHandler {
|
||||
if (!taskAuditStatus.equals(AuditStatusEnum.AUDITED.getNum())) {
|
||||
throw new IllegalArgumentException("无效的task_id,因为未通过审核");
|
||||
}
|
||||
/*
|
||||
什么时候会调用这个函数呢,
|
||||
1是周期函数判断出来当前时间超过了任务开始时间,且运行状态处于PENDING,审批状态为AUDITED
|
||||
2是走http接口调用,这时候有可能是任务开始时间已经过了,也有可能是任务开始时间还没到
|
||||
任务开始时间已经过了,周期性函数已经执行转变为RUNNNING状态了,前端再请求启动。 会报错
|
||||
所以只能是任务开始时间还没到,这时候“提前启动”就要设置任务开始时间为当前时间
|
||||
*/
|
||||
if (task.getTaskStartTime().isAfter(LocalDateTime.now())) {
|
||||
taskService.updateTaskStartTime(taskId);
|
||||
}
|
||||
|
||||
return switch (TaskTypeEnum.getTaskTypeByNum(task.getTaskType())) {
|
||||
case STATIC -> handleStaticTaskStart(commandService, taskService, task);
|
||||
@@ -115,8 +130,12 @@ public class StateHandler {
|
||||
private Boolean handleDynamicTaskStart(TaskService taskService, Task task) {
|
||||
// 将所有关联的动态规则审批状态修改为“已使用”
|
||||
taskService.updateDynamicRuleAuditStatusInTask(task.getTaskId(), AuditStatusEnum.USING);
|
||||
|
||||
return sendFilters(taskService, task);
|
||||
try{
|
||||
return sendFilters(taskService, task);
|
||||
} catch (Exception e) {
|
||||
log.error("动态任务筛选条件发送出错", e);
|
||||
return true;
|
||||
}
|
||||
// return true;
|
||||
}
|
||||
|
||||
@@ -136,7 +155,7 @@ public class StateHandler {
|
||||
sendCommandDistributeSignal(commandUUIDs);
|
||||
return true;
|
||||
}
|
||||
private Boolean sendCommandDistributeSignal(List<String> commandUUIDs) {
|
||||
public Boolean sendCommandDistributeSignal(List<String> commandUUIDs) {
|
||||
|
||||
List<Map<String, String>> commandIDMaps = new ArrayList<>();
|
||||
for (String commandUUID : commandUUIDs) {
|
||||
@@ -149,6 +168,7 @@ public class StateHandler {
|
||||
Mono<Map> mono = client_commandDistribute.post()
|
||||
.uri("/rule")
|
||||
.bodyValue(commandIDMaps)
|
||||
.accept(MediaType.APPLICATION_JSON) // 设置Accept头为application/json
|
||||
.exchangeToMono(res -> {
|
||||
if (res.statusCode().equals(HttpStatus.OK)) {
|
||||
return res.bodyToMono(Map.class);
|
||||
|
||||
@@ -13,7 +13,7 @@ public class FailedState extends StateHandler implements State {
|
||||
case RUNNING -> handleStart(taskService, commandService, taskId);
|
||||
case STOP -> handleStop(commandService, taskService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ public class FinishedState extends StateHandler implements State {
|
||||
public Boolean handle(State newState, CommandService commandService, TaskService taskService, Long taskId) {
|
||||
return switch (StateEnum.getStateEnumByState(newState)) {
|
||||
case PENDING, FINISHED -> true;
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ public class GeneratingState extends StateHandler implements State {
|
||||
case RUNNING -> true;
|
||||
case FAILED -> handleFailed(commandService, taskService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ public class PauseState extends StateHandler implements State {
|
||||
case STOP -> handleStop(commandService, taskService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskService, taskId);
|
||||
case FAILED -> handleFailed(commandService, taskService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ public class PendingState extends StateHandler implements State {
|
||||
case FAILED -> handleFailed(commandService, taskService, taskId);
|
||||
case RUNNING -> handleStart(taskService, commandService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskService, taskId);
|
||||
default -> throw new IllegalStateException(taskId + " meets unexpected value: "
|
||||
default -> throw new IllegalStateException("错误的状态: "
|
||||
+ StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ public class RunningState extends StateHandler implements State {
|
||||
case STOP -> handleStop(commandService, taskService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskService, taskId);
|
||||
case FAILED -> handleFailed(commandService, taskService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ public class StopState extends StateHandler implements State {
|
||||
case RUNNING -> handleStart(taskService, commandService, taskId);
|
||||
case FAILED -> handleFailed(commandService, taskService, taskId);
|
||||
case FINISHED -> handleFinish(commandService, taskService, taskId);
|
||||
default -> throw new IllegalStateException("Unexpected value: " + StateEnum.getStateEnumByState(newState));
|
||||
default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user