Merge remote-tracking branch 'origin/master' into haskafka

This commit is contained in:
PushM
2024-04-25 22:30:08 +08:00
7 changed files with 101 additions and 31 deletions

View File

@@ -1,5 +1,6 @@
package com.realtime.protection.configuration.entity.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.time.LocalDateTime;
@@ -10,23 +11,44 @@ public class DynamicTaskInfo {
@Data
private static class SimpleProtectObject {
@JsonProperty("ip")
private String IP;
@JsonProperty("port")
private Integer port;
@JsonProperty("url")
private String URL;
@JsonProperty("protocol")
private String protocol;
}
// 从任务中获取
@JsonProperty("task_id")
private Long taskId;
@JsonProperty("start_time")
private LocalDateTime startTime;
@JsonProperty("end_time")
private LocalDateTime endTime;
// 从规则中获取
@JsonProperty("rule_id")
private Integer ruleId;
@JsonProperty("source_system")
private String sourceSystem;
@JsonProperty("event_type")
private String eventType;
@JsonProperty("log_rule_id")
private Long logRuleId;
// 从防护对象列表中获取
@JsonProperty("protect_objects")
private List<SimpleProtectObject> protectObjects;
}

View File

@@ -80,7 +80,7 @@ public class TaskController implements TaskControllerApi {
List<Task> tasks = taskService.queryTasks(taskStatus, taskType, taskName, taskCreator, auditStatus, page, pageSize);
return ResponseResult.ok()
.setData("task_list", tasks)
.setData("total_num", taskService.queryTaskTotalNum(taskStatus, taskType, taskName, taskCreator));
.setData("total_num", taskService.queryTaskTotalNum(taskStatus, taskType, taskName, taskCreator, auditStatus));
}
@Override
@@ -192,9 +192,9 @@ public class TaskController implements TaskControllerApi {
@GetMapping("/statistics")
public ResponseResult statistics() {
return ResponseResult.ok()
.setData("total_num", taskService.queryTaskTotalNum(null, null, null, null))
.setData("running_num", taskService.queryTaskTotalNum(StateEnum.RUNNING.getStateNum(), null, null, null))
.setData("finished_num", taskService.queryTaskTotalNum(StateEnum.FINISHED.getStateNum(), null, null, null))
.setData("total_num", taskService.queryTaskTotalNum(null, null, null, null, null))
.setData("running_num", taskService.queryTaskTotalNum(StateEnum.RUNNING.getStateNum(), null, null, null, null))
.setData("finished_num", taskService.queryTaskTotalNum(StateEnum.FINISHED.getStateNum(), null, null, null, null))
.setData("unaudit_num", taskService.queryAuditTaskTotalNum(
AuditStatusEnum.PENDING.getNum()
));

View File

@@ -49,16 +49,19 @@ public interface TaskMapper {
Integer queryTaskStatus(@Param("task_id") Long taskId);
List<Integer> queryDynamicRuleIdsFromTaskId(@Param("task_id") Long taskId);
List<Integer> queryDynamicRuleIdsFromTaskId(@Param("task_id") Long taskId,
@Param("audit_status_list") List<Integer> auditStatusList);
List<Integer> queryStaticRuleIdsFromTaskId(@Param("task_id") Long taskId);
List<Integer> queryStaticRuleIdsFromTaskId(@Param("task_id") Long taskId,
@Param("audit_status_list") List<Integer> auditStatusList);
@Select("SELECT task_id FROM t_task WHERE task_end_time < NOW() AND task_status != #{task_status}")
List<Long> queryTasksByStatus(@Param("task_status") Integer taskStatus);
Integer queryTaskTotalNum(@Param("task_status") Integer taskStatus, @Param("task_type") Integer task_type,
@Param("task_name") String taskName, @Param("task_creator") String taskCreator);
@Param("task_name") String taskName, @Param("task_creator") String taskCreator,
@Param("audit_status") Integer auditStatus);
void updateAuditStatusByIdBatch(@Param("idWithAuditStatusBatch") Map<Integer, Integer> idWithAuditStatusBatch);

View File

@@ -92,7 +92,13 @@ public class TaskService {
return;
}
List<StaticRuleObject> staticRuleObjects = staticRuleMapper.queryStaticRuleByIds(taskMapper.queryDynamicRuleIdsFromTaskId(taskId));
List<Integer> staticRuleIds = taskMapper.queryStaticRuleIdsFromTaskId(taskId,
List.of(AuditStatusEnum.AUDITED.getNum(), AuditStatusEnum.USING.getNum()));
if (staticRuleIds == null || staticRuleIds.isEmpty()) {
return;
}
List<StaticRuleObject> staticRuleObjects = staticRuleMapper.queryStaticRuleByIds(staticRuleIds);
if (staticRuleObjects == null || staticRuleObjects.isEmpty()) {
throw new IllegalArgumentException("静态规则列表中的ID不存在请检查静态规则是否真实存在");
}
@@ -147,9 +153,15 @@ public class TaskService {
return;
}
List<DynamicRuleObject> dynamicRuleObjects = dynamicRuleMapper.queryDynamicRuleByIds(taskMapper.queryDynamicRuleIdsFromTaskId(taskId));
List<Integer> dynamicRuleIds = taskMapper.queryDynamicRuleIdsFromTaskId(taskId,
List.of(AuditStatusEnum.AUDITED.getNum(), AuditStatusEnum.USING.getNum()));
if (dynamicRuleIds == null || dynamicRuleIds.isEmpty()) {
return;
}
List<DynamicRuleObject> dynamicRuleObjects = dynamicRuleMapper.queryDynamicRuleByIds(dynamicRuleIds);
if (dynamicRuleObjects == null || dynamicRuleObjects.isEmpty()) {
throw new IllegalArgumentException("态规则列表中的ID不存在请检查态规则是否真实存在");
throw new IllegalArgumentException("态规则列表中的ID不存在请检查态规则是否真实存在");
}
// 检查所有的动态规则列表的审批状态是否正确,如不正确则报错
@@ -197,8 +209,10 @@ public class TaskService {
if (task == null) {
continue;
}
task.setStaticRuleIds(taskMapper.queryStaticRuleIdsFromTaskId(task.getTaskId()));
task.setDynamicRuleIds(taskMapper.queryDynamicRuleIdsFromTaskId(task.getTaskId()));
task.setStaticRuleIds(taskMapper.queryStaticRuleIdsFromTaskId(task.getTaskId(),
List.of(AuditStatusEnum.AUDITED.getNum(), AuditStatusEnum.USING.getNum())));
task.setDynamicRuleIds(taskMapper.queryDynamicRuleIdsFromTaskId(task.getTaskId(),
List.of(AuditStatusEnum.AUDITED.getNum(), AuditStatusEnum.USING.getNum())));
}
return tasks;
@@ -211,8 +225,10 @@ public class TaskService {
return null;
}
task.setStaticRuleIds(taskMapper.queryStaticRuleIdsFromTaskId(task.getTaskId()));
task.setDynamicRuleIds(taskMapper.queryDynamicRuleIdsFromTaskId(task.getTaskId()));
task.setStaticRuleIds(taskMapper.queryStaticRuleIdsFromTaskId(task.getTaskId(),
List.of(AuditStatusEnum.AUDITED.getNum(), AuditStatusEnum.USING.getNum())));
task.setDynamicRuleIds(taskMapper.queryDynamicRuleIdsFromTaskId(task.getTaskId(),
List.of(AuditStatusEnum.AUDITED.getNum(), AuditStatusEnum.USING.getNum())));
return task;
}
@@ -303,8 +319,8 @@ public class TaskService {
return taskMapper.queryTasksByStatus(StateEnum.FINISHED.getStateNum());
}
public Integer queryTaskTotalNum(Integer taskStatus, Integer taskType, String taskName, String taskCreator) {
return taskMapper.queryTaskTotalNum(taskStatus, taskType, taskName, taskCreator);
public Integer queryTaskTotalNum(Integer taskStatus, Integer taskType, String taskName, String taskCreator, Integer auditStatus) {
return taskMapper.queryTaskTotalNum(taskStatus, taskType, taskName, taskCreator, auditStatus);
}
public Object updateAuditStatusBatch(Map<Integer, Integer> idsWithAuditStatusMap) {

View File

@@ -13,13 +13,14 @@ import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class StateHandler {
private final WebClient client = WebClient.builder()
.baseUrl("") // todo: unfinished
.baseUrl("http://192.168.107.89:9081")
.build();
protected Boolean handleStart(TaskService taskService, CommandService commandService, Long taskId) {
@@ -79,8 +80,8 @@ public class StateHandler {
return true;
}
// todo: 如果是实时任务或者研判后处置任务,那么就需要在任务启动之后,立刻向动态规则中指定的系统发送日志筛选请求。
// 筛选完成后,系统返回日志,需要由接收端点提取字段,并且合成一条静态规则,再按照任务开始时间、结束时间和任务类型进行指令创建
// 如果是实时任务或者研判后处置任务,那么就需要在任务启动之后,立刻向动态规则中指定的系统发送日志筛选请求。
// 筛选完成后,系统返回日志,需要由接收端点提取字段,并且合成一条静态规则,再按照任务开始时间、结束时间和任务类型进行指令创建
private Boolean handleJudgedTaskStart(TaskService taskService, Task task) {
return sendFilters(taskService, task);
}
@@ -98,6 +99,7 @@ public class StateHandler {
// 将所有关联的静态规则全部设置为已使用状态
taskService.updateStaticRuleAuditStatusInTask(task.getTaskId(), AuditStatusEnum.USING);
// taskService.changeTaskAuditStatus(task.getTaskId(), AuditStatusEnum.USING.getNum());
commandService.createCommands(staticTaskCommandInfos);
return true;
@@ -110,12 +112,13 @@ public class StateHandler {
throw new IllegalArgumentException("动态规则列表为空,请至少选择一个动态规则以启动动态/研判后类型任务");
}
taskService.updateDynamicRuleAuditStatusInTask(task.getTaskId(), AuditStatusEnum.AUDITED);
// 将所有关联的动态规则审批状态修改为“已使用”
taskService.updateDynamicRuleAuditStatusInTask(task.getTaskId(), AuditStatusEnum.USING);
AtomicReference<Boolean> success = new AtomicReference<>(false);
Mono<SimpleResponse> mono = client.post()
.uri("http://192.168.107.89:9081/api/v1/kafkasend") // todo: untested
.uri("/api/v1/kafkasend")
.bodyValue(dynamicTaskInfos)
.exchangeToMono(res -> {
if (res.statusCode().equals(HttpStatus.OK)) {
@@ -126,9 +129,10 @@ public class StateHandler {
})
.doOnError(WebClientResponseException.class, res -> success.set(false));
SimpleResponse response = mono.block();
if (response == null) {
SimpleResponse response = mono.block(Duration.ofSeconds(5));
if (response == null || response.getSuccess() == null) {
return false;
}