diff --git a/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java b/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java index 401e90d..0d8c225 100644 --- a/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java +++ b/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java @@ -129,6 +129,8 @@ public class TaskCommandInfo { private Long rcpHitCount; + private Long totalPacketNum; + // private String hashValue; diff --git a/src/main/java/com/realtime/protection/configuration/utils/enums/CommandStatusEnum.java b/src/main/java/com/realtime/protection/configuration/utils/enums/CommandStatusEnum.java new file mode 100644 index 0000000..293a887 --- /dev/null +++ b/src/main/java/com/realtime/protection/configuration/utils/enums/CommandStatusEnum.java @@ -0,0 +1,46 @@ +package com.realtime.protection.configuration.utils.enums; + +import com.realtime.protection.server.task.status.states.State; +import lombok.Getter; + +import java.util.HashMap; +import java.util.Map; +@Getter +public enum CommandStatusEnum { + START(1),//指令生成(研判后任务需要研判,其他任务可直接下发) + SEND(2),//(已研判)下发 + RCPHIT(3),//.. + FLOWHIT(4),//.. + UNRCPHIT(5),//.. + UNFLOWHIT(6),//.. + IGNORE(7),//本次忽略 + WHOLEIGNORE(8),//全部忽略 + SUCCESS(9),//下发成功 + FAIL(10),//下发失败 + STOP(11), + PAUSE(12), + CANCEL(13),//撤销 + END(14);//任务结束 + + private final int commandStatusNum; + private static final Map map = new HashMap<>(); + + static { + for (CommandStatusEnum commandStatus : CommandStatusEnum.values()) { + map.put(commandStatus.getCommandStatusNum(), commandStatus); + } + } + + CommandStatusEnum(int commandStatus) { + this.commandStatusNum = commandStatus; + } + + public static CommandStatusEnum getCommandStatusByNum(Integer commandStatus) { + if (commandStatus == null) { + return null; + } + return map.get(commandStatus); + } + +} + diff --git a/src/main/java/com/realtime/protection/configuration/utils/enums/StateEnum.java b/src/main/java/com/realtime/protection/configuration/utils/enums/StateEnum.java index 91ea1d8..0efbd07 100644 --- a/src/main/java/com/realtime/protection/configuration/utils/enums/StateEnum.java +++ b/src/main/java/com/realtime/protection/configuration/utils/enums/StateEnum.java @@ -16,7 +16,11 @@ public enum StateEnum { PAUSED(3, new PauseState()), STOP(4, new StopState()), FINISHED(5, new FinishedState()), - FAILED(6, new FailedState()); + FAILED(6, new FailedState()), + RUNNING_SUCCESS(7, new RunningSuccessState()), + RUNNING_PARTIAL_SUCCESS(8, new RunningPartialSuccessState()), + RUNNING_FAILED(9, new RunningFailState()); + // ---------------------------------------------- private final State state; diff --git a/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageService.java b/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageService.java index 88fce67..6efbb42 100644 --- a/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageService.java +++ b/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageService.java @@ -6,14 +6,17 @@ import com.realtime.protection.configuration.entity.defense.template.ProtectLeve import com.realtime.protection.configuration.entity.task.FiveTupleWithMask; import com.realtime.protection.configuration.entity.task.TaskCommandInfo; import com.realtime.protection.configuration.utils.Counter; +import com.realtime.protection.configuration.utils.Subnet; import com.realtime.protection.configuration.utils.enums.StateEnum; import com.realtime.protection.configuration.utils.enums.TaskTypeEnum; import com.realtime.protection.server.command.CommandService; +import com.realtime.protection.server.task.TaskService; import com.realtime.protection.server.task.status.StateHandler; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.net.UnknownHostException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -28,13 +31,15 @@ import java.util.UUID; public class AlertMessageService { private final CommandService commandService; private final AlertMessageMapper alertMessageMapper; + private final TaskService taskService; private final Counter counter; private final StateHandler stateHandler; - public AlertMessageService(CommandService commandService, AlertMessageMapper alertMessageMapper, + public AlertMessageService(CommandService commandService, AlertMessageMapper alertMessageMapper, TaskService taskService, Counter counter, StateHandler stateHandler) { this.commandService = commandService; this.alertMessageMapper = alertMessageMapper; + this.taskService = taskService; this.counter = counter; this.stateHandler = stateHandler; } @@ -67,6 +72,15 @@ public class AlertMessageService { case RUNNING: insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 1, alertMessage); break; + case RUNNING_FAILED: + insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 0, alertMessage); + break; + case RUNNING_PARTIAL_SUCCESS: + insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 0, alertMessage); + break; + case RUNNING_SUCCESS: + insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 0, alertMessage); + break; case PAUSED: insertCommandAndAlertMessage(dynamicTaskCommandInfoList, false, 1, alertMessage); break; @@ -78,7 +92,16 @@ public class AlertMessageService { } else if (taskType == TaskTypeEnum.JUDGED.getTaskType())//研判后 switch (StateEnum.getStateEnumByNum(taskStatus)) { - case RUNNING: + case RUNNING : + insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 0, alertMessage); + break; + case RUNNING_FAILED: + insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 0, alertMessage); + break; + case RUNNING_PARTIAL_SUCCESS: + insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 0, alertMessage); + break; + case RUNNING_SUCCESS: insertCommandAndAlertMessage(dynamicTaskCommandInfoList, true, 0, alertMessage); break; case PAUSED: @@ -134,6 +157,7 @@ public class AlertMessageService { Integer isJudged, AlertMessage alertMessage){ List commandUUIDs = new ArrayList<>(); + List subnetList = new ArrayList<>(); for (TaskCommandInfo dynamicTaskCommandInfo : dynamicTaskCommandInfoList ){ //command入库 @@ -155,7 +179,23 @@ public class AlertMessageService { ); alertMessageMapper.insertAlertMessage(alertMessage); + + try { + //抽取告警生成的指令没有掩码 + String sip = dynamicTaskCommandInfo.getFiveTupleWithMask().getSourceIP(); + String msip = "255.255.255.255"; + String dip = dynamicTaskCommandInfo.getFiveTupleWithMask().getDestinationIP(); + String mdip = "255.255.255.255"; + + if (sip != null) subnetList.add(new Subnet(sip,msip)); + if (dip != null) subnetList.add(new Subnet(dip,mdip)); + } catch (UnknownHostException e) { + throw new RuntimeException(e); } + Long ipTotalNum = taskService.ipWithMaskToIpNums(subnetList); + taskService.updateTaskIpTotalNum(ipTotalNum, dynamicTaskCommandInfo.getTaskId()); + } + } private String insertAlertMessageOnly(AlertMessage alertMessage){ diff --git a/src/main/java/com/realtime/protection/server/command/CommandMapper.java b/src/main/java/com/realtime/protection/server/command/CommandMapper.java index 93b7860..27e493a 100644 --- a/src/main/java/com/realtime/protection/server/command/CommandMapper.java +++ b/src/main/java/com/realtime/protection/server/command/CommandMapper.java @@ -57,7 +57,7 @@ public interface CommandMapper { void updateCommandHistoryExpireTimeBatch(@Param("commandIds")List commandIds); - void insertCommandHistoryBatch(@Param("commandIds")List commandIds); + void insertCommandHistoryBatch(@Param("commandIds")List commandIds, @Param("command_status")Integer commandStatus); @DS("mysql") List whiteListCommandCheck(@Param("command") FiveTupleWithMask fiveTupleWithMask); @@ -83,4 +83,20 @@ public interface CommandMapper { Boolean setCommandValid(String commandId, Integer isValid); List queryAllDistributingCommandInfo(); + + List queryRunningCommandsDistributeStatusByTaskId(Long taskId); + + List queryRunningCommandsRcpHitCount(); + + Integer queryCommandLogRcpHitCountByCommandId(@Param("command_id")String commandId); + + void insertCommandHistoryWithStatus(@Param("command_id")String commandUUID, + @Param("log_id")String logId, + @Param("command_status")Integer commandStatus); + + List queryCommandInfosByTaskId(@Param("task_id")Long taskId); + + List queryRunningCommandsTotalPacketNum(); + + Integer queryCommandLogTotalPacketNumByCommandId(@Param("command_id")String uuid); } diff --git a/src/main/java/com/realtime/protection/server/command/CommandService.java b/src/main/java/com/realtime/protection/server/command/CommandService.java index d22a9c1..7eee0a9 100644 --- a/src/main/java/com/realtime/protection/server/command/CommandService.java +++ b/src/main/java/com/realtime/protection/server/command/CommandService.java @@ -7,6 +7,7 @@ import com.realtime.protection.configuration.entity.task.TaskCommandInfo; import com.realtime.protection.configuration.entity.whitelist.WhiteListObject; import com.realtime.protection.configuration.utils.Counter; import com.realtime.protection.configuration.utils.SqlSessionWrapper; +import com.realtime.protection.configuration.utils.enums.CommandStatusEnum; import com.realtime.protection.server.task.status.StateHandler; import com.realtime.protection.server.whitelist.WhiteListMapper; import lombok.extern.slf4j.Slf4j; @@ -91,13 +92,14 @@ public class CommandService { commandMapper.insertCommandTraffic(commandInfo); //写入历史表 - insertCommandHistory(commandInfo.getUUID()); + insertCommandHistory(commandInfo.getUUID(), CommandStatusEnum.START.getCommandStatusNum()); return commandInfo.getUUID(); } @DSTransactional public String createCommand2(TaskCommandInfo commandInfo, Integer isJudged) { String uuid = commandMapper.queryCommandInfo(commandInfo); + //如果指令已经存在,除了研判状态为2,时需要改为0,其他情况都直接返回uuid if (uuid != null) { if (isJudged == 0){ @@ -106,6 +108,8 @@ public class CommandService { //如果研判状态为2,表示之前设置了本次忽略,那这次生成指令后,将其研判状态改为0,需要再次研判 if (originalIsJudged == 2){ commandMapper.updateCommandIsJudgedIfIgnoreThisTime(uuid); + //写入历史表 + insertCommandHistory(commandInfo.getUUID(), CommandStatusEnum.START.getCommandStatusNum()); } } return uuid; @@ -139,7 +143,8 @@ public class CommandService { commandMapper.insertCommandRCPQuery(commandInfo); commandMapper.insertCommandTraffic(commandInfo); //写入历史表 - insertCommandHistory(commandInfo.getUUID()); + insertCommandHistory(commandInfo.getUUID(), CommandStatusEnum.START.getCommandStatusNum()); + //发送指令新建信号...实时任务 isJudged=1 才首次立刻下发 try { @@ -230,9 +235,7 @@ public class CommandService { return commandMapper.queryCommandInfoByUUID(uuid); } - public List queryAllDistributingCommandInfo() { - return commandMapper.queryAllDistributingCommandInfo(); - } + public Boolean startCommandsByTaskId(Long taskId) { @@ -244,7 +247,14 @@ public class CommandService { } public Boolean removeCommandsByTaskId(Long taskId) { - return commandMapper.removeCommandsByTaskId(taskId); + Boolean ok = commandMapper.removeCommandsByTaskId(taskId); + //查询任务下的指令,将指令写入历史表 + List taskCommandIds = commandMapper.queryCommandInfosByTaskId(taskId); + for (String commandId : taskCommandIds) { + insertCommandHistory(commandId, CommandStatusEnum.END.getCommandStatusNum()); + } + return ok; + } public Boolean setCommandJudged(String commandId, Integer isJudged) { @@ -253,7 +263,19 @@ public class CommandService { //设置指令是否已经研判 Boolean success = commandMapper.setCommandJudged(commandId, isJudged); //研判状态也写入历史表 - insertCommandHistory(commandId); + switch (isJudged){ + case 1: + insertCommandHistory(commandId, CommandStatusEnum.SEND.getCommandStatusNum()); + break; + case 2: + insertCommandHistory(commandId, CommandStatusEnum.IGNORE.getCommandStatusNum()); + break; + case 3: + insertCommandHistory(commandId, CommandStatusEnum.WHOLEIGNORE.getCommandStatusNum()); + break; + default: + break; + } try { List commandUUIDs = Collections.singletonList(commandId); @@ -295,6 +317,13 @@ public class CommandService { commandMapper.insertCommandHistory(commandUUID, logId); } + public void insertCommandHistory(String commandUUID,Integer commandStatus) { + //todo: 不update, insert加入uuid +// commandMapper.updateCommandHistoryExpireTime(commandUUID); + String logId = UUID.randomUUID().toString(); + commandMapper.insertCommandHistoryWithStatus(commandUUID, logId, commandStatus); + } + public void insertCommandHistoryBatch(List commandIdList) { List commandIds = ListUtils.newArrayListWithExpectedSize(commandIdList.size()); commandIdList.forEach(item -> commandIds.add(item.getUUID())); @@ -306,13 +335,35 @@ public class CommandService { // logIds.add(UUID.randomUUID().toString()); // } //新建的loguuid拿commannd_id来定顶一会吧 - commandMapper.insertCommandHistoryBatch(commandIds); + commandMapper.insertCommandHistoryBatch(commandIds, CommandStatusEnum.START.getCommandStatusNum()); } //指令提前撤回下发 public Boolean setCommandValid(String commandId, Integer isValid) { Boolean isture = commandMapper.setCommandValid(commandId, isValid); - insertCommandHistory(commandId); + insertCommandHistory(commandId, CommandStatusEnum.CANCEL.getCommandStatusNum()); return isture; } + + public List queryRunningCommandsDistributeStatusByTaskId(Long taskId) { + List commandStatusList = commandMapper.queryRunningCommandsDistributeStatusByTaskId(taskId); + return commandStatusList; + + } + + public List queryRunningCommandsRcpHitCount() { + return commandMapper.queryRunningCommandsRcpHitCount(); + } + + public Integer queryCommandLogRcpHitCountByCommandId(String commandI) { + return commandMapper.queryCommandLogRcpHitCountByCommandId(commandI); + } + + public List queryRunningCommandsTotalPacketNum() { + return commandMapper.queryRunningCommandsTotalPacketNum(); + } + + public Integer queryCommandLogTotalPacketNumByCommandId(String uuid) { + return commandMapper.queryCommandLogTotalPacketNumByCommandId(uuid); + } } diff --git a/src/main/java/com/realtime/protection/server/rule/dynamicrule/DynamicRuleService.java b/src/main/java/com/realtime/protection/server/rule/dynamicrule/DynamicRuleService.java index 397f53c..eea36e4 100644 --- a/src/main/java/com/realtime/protection/server/rule/dynamicrule/DynamicRuleService.java +++ b/src/main/java/com/realtime/protection/server/rule/dynamicrule/DynamicRuleService.java @@ -87,6 +87,12 @@ public class DynamicRuleService { switch (StateEnum.getStateEnumByNum(taskStatus)){ case RUNNING: throw new IllegalArgumentException("使用该动态规则的任务处于运行状态"); + case RUNNING_FAILED: + throw new IllegalArgumentException("使用该动态规则的任务处于运行失败状态"); + case RUNNING_PARTIAL_SUCCESS: + throw new IllegalArgumentException("使用该动态规则的任务处于部分成功状态"); + case RUNNING_SUCCESS: + throw new IllegalArgumentException("使用该动态规则的任务处于运行成功状态"); case PAUSED: throw new IllegalArgumentException("使用该动态规则的任务处于暂停状态"); default: diff --git a/src/main/java/com/realtime/protection/server/task/TaskController.java b/src/main/java/com/realtime/protection/server/task/TaskController.java index 3bb1720..15f37e3 100644 --- a/src/main/java/com/realtime/protection/server/task/TaskController.java +++ b/src/main/java/com/realtime/protection/server/task/TaskController.java @@ -410,8 +410,15 @@ public class TaskController implements TaskControllerApi { return ResponseResult.ok() .setData("total_num", taskService.queryTaskTotalNum(null, null, null, null, null, null, null, null, null,null,null,null,null)) - .setData("running_num", taskService.queryTaskTotalNum(StateEnum.RUNNING.getStateNum(), null, null, null, null, - null, null, null, null,null,null,null,null)) + .setData("running_num", + taskService.queryTaskTotalNum(StateEnum.RUNNING.getStateNum(), null, null, null, null, + null, null, null, null,null,null,null,null) + + taskService.queryTaskTotalNum(StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum(), null, null, null, null, + null, null, null, null,null,null,null,null) + + taskService.queryTaskTotalNum(StateEnum.RUNNING_SUCCESS.getStateNum(), null, null, null, null, + null, null, null, null,null,null,null,null) + + taskService.queryTaskTotalNum(StateEnum.RUNNING_FAILED.getStateNum(), null, null, null, null, + null, null, null, null,null,null,null,null)) .setData("finished_num", taskService.queryTaskTotalNum(StateEnum.FINISHED.getStateNum(), null, null, null, null, null, null, null, null,null,null,null,null)) .setData("unaudit_num", taskService.queryAuditTaskTotalNum(AuditStatusEnum.PENDING.getNum())) diff --git a/src/main/java/com/realtime/protection/server/task/TaskMapper.java b/src/main/java/com/realtime/protection/server/task/TaskMapper.java index bfd0317..7f51476 100644 --- a/src/main/java/com/realtime/protection/server/task/TaskMapper.java +++ b/src/main/java/com/realtime/protection/server/task/TaskMapper.java @@ -147,4 +147,8 @@ public interface TaskMapper { List queryTaskStausLog(Long id, Integer page, Integer pageSize); Integer queryTaskStausLogTotalNum(Long id); + + List queryRunningTasks(Integer running, Integer allSuccess, Integer partSuccess, Integer allFail); + + Integer queryIpTotalNum(Long taskId); } diff --git a/src/main/java/com/realtime/protection/server/task/TaskService.java b/src/main/java/com/realtime/protection/server/task/TaskService.java index dce270c..59d2225 100644 --- a/src/main/java/com/realtime/protection/server/task/TaskService.java +++ b/src/main/java/com/realtime/protection/server/task/TaskService.java @@ -94,7 +94,7 @@ public class TaskService { } // totalIPs = (long) mergedSubnets.stream().mapToInt(Subnet::getNumberOfHosts).sum(); - log.info("合并后的子网涉及的总IP数量是: " + totalIPs); +// log.debug("合并后的子网涉及的总IP数量是: " + totalIPs); } catch (UnknownHostException e) { @@ -483,7 +483,8 @@ public class TaskService { if (task == null) { return true; } - if (task.getTaskStatus() == StateEnum.RUNNING.getStateNum()) { + if (task.getTaskStatus() == StateEnum.RUNNING.getStateNum() || task.getTaskStatus() == StateEnum.RUNNING_FAILED.getStateNum() + || task.getTaskStatus() == StateEnum.RUNNING_SUCCESS.getStateNum()|| task.getTaskStatus() == StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum()){ throw new IllegalArgumentException("任务正在运行中,无法删除"); } //有的规则被任务选择了,但任务还没有启动,此时规则的状态不是已使用,但used_task_id已经被设置 @@ -803,4 +804,15 @@ public class TaskService { public Integer queryTaskStausLogTotalNum(Long id) { return taskMapper.queryTaskStausLogTotalNum(id); } + + public List getRunningTasks() { + return taskMapper.queryRunningTasks(StateEnum.RUNNING.getStateNum(),StateEnum.RUNNING_SUCCESS.getStateNum(), + StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum(),StateEnum.RUNNING_FAILED.getStateNum()); + } + + public void updateTaskIpTotalNum(Long ipTotalNum, Long taskId) { + Integer taskIpNum = taskMapper.queryIpTotalNum(taskId); + ipTotalNum = ipTotalNum + taskIpNum; + taskMapper.setIpTotalNum(ipTotalNum, taskId); + } } diff --git a/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java b/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java index 16dc265..8d688ba 100644 --- a/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java +++ b/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java @@ -1,7 +1,9 @@ package com.realtime.protection.server.task.status; import com.baomidou.dynamic.datasource.annotation.DSTransactional; +import com.realtime.protection.configuration.entity.task.TaskCommandInfo; import com.realtime.protection.configuration.exception.DorisStartException; +import com.realtime.protection.configuration.utils.enums.CommandStatusEnum; import com.realtime.protection.configuration.utils.enums.StateEnum; import com.realtime.protection.server.task.status.states.State; import com.realtime.protection.server.command.CommandService; @@ -138,4 +140,134 @@ public class StateChangeService { } } + /** + * 更新任务状态,要么加字段(运行中加两个)、“要么改状态模式(字典表也要改)” + */ + @Scheduled(cron = "0 0/5 * * * ?") + @Async + protected void updateTasksRunningStatus() { + //查询所有正在运行中的任务 + List runningTaskIds = taskService.getRunningTasks(); + log.info("更新任务运行中状态:成功扫描出所有正在运行中的任务:{}", runningTaskIds); + + for (Long taskId : runningTaskIds) { + //查询任务下所有的正在下发的指令(isjudge = 1 ,isvalid=1, is_deleted=0)的本次是否下发成功字段(t_command加个字段) + List commandStatusList = commandService.queryRunningCommandsDistributeStatusByTaskId(taskId); + if (commandStatusList == null){ + continue; + } + //判断是否全为成功 + if (commandStatusList.stream().allMatch(status -> status == null || status == 1)) { + //全为成功 + try { + changeState(StateEnum.RUNNING_SUCCESS.getStateNum(), taskId, true); + } catch (Exception e) { + log.warn(String.format("任务%d从%s状态变为运行中RUNNING_SUCCESS状态遭遇异常:%s", + taskId, taskService.queryTaskStatus(taskId), e.getMessage())); + } + } else if (commandStatusList.stream().anyMatch(status -> status == null || status == 1)) { + //部分成功 + try { + changeState(StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum(), taskId, true); + } catch (Exception e) { + log.warn(String.format("任务%d从%s状态变为运行中RUNNING_PARTIAL_SUCCESS状态遭遇异常:%s", + taskId, taskService.queryTaskStatus(taskId), e.getMessage())); + } + }else{ + //全失败 + try { + changeState(StateEnum.RUNNING_FAILED.getStateNum(), taskId, true); + } catch (Exception e) { + log.warn(String.format("任务%d从%s状态变为运行中RUNNING_FAILED状态遭遇异常:%s", + taskId, taskService.queryTaskStatus(taskId), e.getMessage())); + } + } + } + } + + + /** + * 更新指令rcp命中状态 + */ + @Scheduled(cron = "0 1/5 * * * ?") + @Async + protected void updateCommandRCPHitStatus() { + //获取所有正在下发的指令 + List commandRcpHitCountList = commandService.queryRunningCommandsRcpHitCount(); + log.info("rcp命中状态更新:正在下发的指令数量:{}", commandRcpHitCountList.size()); + //遍历所有指令 + for (TaskCommandInfo taskCommandInfo : commandRcpHitCountList) { + //查询当前指令的rcp-hitcount字段 + Long rcpHitCount = taskCommandInfo.getRcpHitCount(); + //查询当前历史表中该指令上一条日志的rcp-hitcount字段 + Integer lastRcpHitCount = commandService.queryCommandLogRcpHitCountByCommandId(taskCommandInfo.getUUID()); + //比较是否有变化 + if (rcpHitCount == null ) { + //更新log表,rcp没命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNRCPHIT.getCommandStatusNum()); + continue; + } + if (lastRcpHitCount == null){ + //更新log表,rcp命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.RCPHIT.getCommandStatusNum()); + continue; + } + + if (rcpHitCount > lastRcpHitCount) { + //更新log表,rcp命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.RCPHIT.getCommandStatusNum()); + } else { + //更新log表,rcp没命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNRCPHIT.getCommandStatusNum()); + } + + } + + } + + + /** + * 更新指令rcp命中状态 + */ + @Scheduled(cron = "0 3/5 * * * ?") + @Async + protected void updateCommandTrafficHitStatus() { + //查询当前历史表中该指令上一条日志的统计流量大小字段 + + //比较是否有变化 + + //更新log表,流量命中、流量未命中 + //获取所有正在下发的指令 + List commandTotalPacketNumList = commandService.queryRunningCommandsTotalPacketNum(); + log.info("流量命中状态:查询正在下发的指令数量:{}", commandTotalPacketNumList.size()); + //遍历所有指令 + for (TaskCommandInfo taskCommandInfo : commandTotalPacketNumList) { + //查询当前指令的统计流量大小字段 + Long totalPacketNum = taskCommandInfo.getTotalPacketNum(); + //查询当前历史表中该指令上一条日志的rcp-hitcount字段 + Integer lastTotalPacketNum = commandService.queryCommandLogTotalPacketNumByCommandId(taskCommandInfo.getUUID()); + //比较是否有变化 + if (totalPacketNum == null ) { + //更新log表,rcp没命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNFLOWHIT.getCommandStatusNum()); + continue; + } + if (lastTotalPacketNum == null){ + //更新log表,rcp命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.FLOWHIT.getCommandStatusNum()); + continue; + } + + if (totalPacketNum > lastTotalPacketNum) { + //更新log表,rcp命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.FLOWHIT.getCommandStatusNum()); + } else { + //更新log表,rcp没命中 + commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNFLOWHIT.getCommandStatusNum()); + } + + } + + } + } diff --git a/src/main/java/com/realtime/protection/server/task/status/StateHandler.java b/src/main/java/com/realtime/protection/server/task/status/StateHandler.java index cbbdad9..55161cd 100644 --- a/src/main/java/com/realtime/protection/server/task/status/StateHandler.java +++ b/src/main/java/com/realtime/protection/server/task/status/StateHandler.java @@ -4,6 +4,7 @@ import com.realtime.protection.configuration.entity.task.DynamicTaskInfo; import com.realtime.protection.configuration.entity.task.Task; import com.realtime.protection.configuration.entity.task.TaskCommandInfo; import com.realtime.protection.configuration.response.SimpleResponse; +import com.realtime.protection.configuration.utils.enums.StateEnum; import com.realtime.protection.configuration.utils.enums.TaskTypeEnum; import com.realtime.protection.configuration.utils.enums.audit.AuditStatusEnum; import com.realtime.protection.server.command.CommandService; @@ -115,7 +116,6 @@ public class StateHandler { return true; } - protected Boolean handleFailed(CommandService commandService, TaskService taskService, Long taskId) { commandService.removeCommandsByTaskId(taskId); taskService.updateDynamicRuleAuditStatusInTask(taskId, AuditStatusEnum.AUDITED); @@ -126,6 +126,64 @@ public class StateHandler { return true; } + protected Boolean handleRunningFail(CommandService commandService, TaskService taskService, Long taskId) { + //查询任务状态 + Integer taskStatusNum = taskService.queryTaskStatus(taskId); + + if (taskStatusNum == StateEnum.RUNNING_SUCCESS.getStateNum()){ + return true; + } + else{ + //修改任务状态 + taskService.changeTaskStatus(taskId, StateEnum.RUNNING_SUCCESS.getStateNum()); + //插入历史表 + taskService.insertTaskStatusLog(taskId); + } + return true; + } + + protected Boolean handleRunningPartialSuccess(CommandService commandService, TaskService taskService, Long taskId) { + ///查询任务状态 + Integer taskStatusNum = taskService.queryTaskStatus(taskId); + + if (taskStatusNum == StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum()){ + return true; + } + else{ + //修改任务状态 + taskService.changeTaskStatus(taskId, StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum()); + //插入历史表 + taskService.insertTaskStatusLog(taskId); + } + return true; + } + + protected Boolean handleRunningSuccess(CommandService commandService, TaskService taskService, Long taskId) { + ///查询任务状态 + Integer taskStatusNum = taskService.queryTaskStatus(taskId); + + if (taskStatusNum == StateEnum.RUNNING_FAILED.getStateNum()){ + return true; + } + else{ + //修改任务状态 + taskService.changeTaskStatus(taskId, StateEnum.RUNNING_FAILED.getStateNum()); + //插入历史表 + taskService.insertTaskStatusLog(taskId); + } + return true; + } + + protected Boolean handleAllFail(CommandService commandService, TaskService taskService, Long taskId) { + commandService.removeCommandsByTaskId(taskId); + taskService.updateDynamicRuleAuditStatusInTask(taskId, AuditStatusEnum.AUDITED); + // 要删去规则的used_task_id,因为在新建时检查了是否有重复使用的规则 + taskService.removeDynamicRuleUsedTaskIdInTask(taskId); + taskService.updateStaticRuleAuditStatusInTask(taskId, AuditStatusEnum.AUDITED); + taskService.removeStaticRuleUsedTaskIdInTask(taskId); + return true; + } + // 如果是实时任务或者研判后处置任务,那么就需要在任务启动之后,立刻向动态规则中指定的系统发送日志筛选请求。 // 筛选完成后,系统返回日志,需要由接收端点提取字段,并且合成一条静态规则,再按照任务开始时间、结束时间和任务类型进行指令创建 private Boolean handleJudgedTaskStart(TaskService taskService, Task task) { diff --git a/src/main/java/com/realtime/protection/server/task/status/states/RunningFailState.java b/src/main/java/com/realtime/protection/server/task/status/states/RunningFailState.java new file mode 100644 index 0000000..68e3f33 --- /dev/null +++ b/src/main/java/com/realtime/protection/server/task/status/states/RunningFailState.java @@ -0,0 +1,21 @@ +package com.realtime.protection.server.task.status.states; + +import com.realtime.protection.configuration.utils.enums.StateEnum; +import com.realtime.protection.server.command.CommandService; +import com.realtime.protection.server.task.TaskService; +import com.realtime.protection.server.task.status.StateHandler; + +public class RunningFailState extends StateHandler implements State { + @Override + public Boolean handle(State newState, CommandService commandService, TaskService taskService, Long taskId) { + return switch (StateEnum.getStateEnumByState(newState)) { + case PAUSED -> handlePause(commandService, taskId); + case STOP -> handleStop(commandService, taskService, taskId); + case FINISHED -> handleFinish(commandService, taskService, taskId); + case FAILED -> handleFailed(commandService, taskService, taskId); + case RUNNING_SUCCESS -> handleRunningSuccess(commandService, taskService, taskId); + case RUNNING_PARTIAL_SUCCESS -> handleRunningPartialSuccess(commandService, taskService, taskId); + default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState)); + }; + } +} \ No newline at end of file diff --git a/src/main/java/com/realtime/protection/server/task/status/states/RunningPartialSuccessState.java b/src/main/java/com/realtime/protection/server/task/status/states/RunningPartialSuccessState.java new file mode 100644 index 0000000..c6cee31 --- /dev/null +++ b/src/main/java/com/realtime/protection/server/task/status/states/RunningPartialSuccessState.java @@ -0,0 +1,21 @@ +package com.realtime.protection.server.task.status.states; + +import com.realtime.protection.configuration.utils.enums.StateEnum; +import com.realtime.protection.server.command.CommandService; +import com.realtime.protection.server.task.TaskService; +import com.realtime.protection.server.task.status.StateHandler; + +public class RunningPartialSuccessState extends StateHandler implements State { + @Override + public Boolean handle(State newState, CommandService commandService, TaskService taskService, Long taskId) { + return switch (StateEnum.getStateEnumByState(newState)) { + case PAUSED -> handlePause(commandService, taskId); + case STOP -> handleStop(commandService, taskService, taskId); + case FINISHED -> handleFinish(commandService, taskService, taskId); + case FAILED -> handleFailed(commandService, taskService, taskId); + case RUNNING_SUCCESS -> handleRunningSuccess(commandService, taskService, taskId); + case RUNNING_FAILED -> handleRunningFail(commandService, taskService, taskId); + default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState)); + }; + } +} diff --git a/src/main/java/com/realtime/protection/server/task/status/states/RunningState.java b/src/main/java/com/realtime/protection/server/task/status/states/RunningState.java index 0b3fcdb..84923b8 100644 --- a/src/main/java/com/realtime/protection/server/task/status/states/RunningState.java +++ b/src/main/java/com/realtime/protection/server/task/status/states/RunningState.java @@ -13,6 +13,9 @@ public class RunningState extends StateHandler implements State { case STOP -> handleStop(commandService, taskService, taskId); case FINISHED -> handleFinish(commandService, taskService, taskId); case FAILED -> handleFailed(commandService, taskService, taskId); + case RUNNING_SUCCESS -> handleRunningSuccess(commandService, taskService, taskId); + case RUNNING_PARTIAL_SUCCESS -> handleRunningPartialSuccess(commandService, taskService, taskId); + case RUNNING_FAILED -> handleRunningFail(commandService, taskService, taskId); default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState)); }; } diff --git a/src/main/java/com/realtime/protection/server/task/status/states/RunningSuccessState.java b/src/main/java/com/realtime/protection/server/task/status/states/RunningSuccessState.java new file mode 100644 index 0000000..2a9ed52 --- /dev/null +++ b/src/main/java/com/realtime/protection/server/task/status/states/RunningSuccessState.java @@ -0,0 +1,22 @@ +package com.realtime.protection.server.task.status.states; + +import com.realtime.protection.configuration.utils.enums.StateEnum; +import com.realtime.protection.server.command.CommandService; +import com.realtime.protection.server.task.TaskService; +import com.realtime.protection.server.task.status.StateHandler; + + +public class RunningSuccessState extends StateHandler implements State { + @Override + public Boolean handle(State newState, CommandService commandService, TaskService taskService, Long taskId) { + return switch (StateEnum.getStateEnumByState(newState)) { + case PAUSED -> handlePause(commandService, taskId); + case STOP -> handleStop(commandService, taskService, taskId); + case FINISHED -> handleFinish(commandService, taskService, taskId); + case FAILED -> handleFailed(commandService, taskService, taskId); + case RUNNING_FAILED -> handleRunningFail(commandService, taskService, taskId); + case RUNNING_PARTIAL_SUCCESS -> handleRunningPartialSuccess(commandService, taskService, taskId); + default -> throw new IllegalStateException("错误的状态: " + StateEnum.getStateEnumByState(newState)); + }; + } +} \ No newline at end of file diff --git a/src/main/resources/mappers/CommandMapper.xml b/src/main/resources/mappers/CommandMapper.xml index 7b51f89..e7b00bc 100644 --- a/src/main/resources/mappers/CommandMapper.xml +++ b/src/main/resources/mappers/CommandMapper.xml @@ -372,7 +372,8 @@ last_traffic_query_time, log_uuid, sip_int, - dip_int + dip_int, + command_status ) select NOW(), @@ -428,7 +429,8 @@ last_traffic_query_time, COMMAND_ID, sip_int, - dip_int + dip_int, + #{command_status} from t_command where COMMAND_ID IN @@ -443,7 +445,123 @@ (#{command_id}, #{whiteList.whiteListId}) - + + insert into t_command_log( + effective_time, + expire_time, + TASK_ID, + RULE_ID, + COMMAND_ID, + TASKTYPE, + ADDR_TYPE, + SRC_IP, + SRC_PORT, + DST_IP, + DST_PORT, + PROTOCOL, + MASK_SRC_IP, + MASK_SRC_PORT, + MASK_DST_IP, + MASK_DST_PORT, + MASK_PROTOCOL, + TASK_ACT, + EVENTTYPE, + TASKNAME, + DISTRIBUTEPOINT, + DEPARTMENT, + FREQUENCY, + VALID_TIME, + INVALID_TIME, + IS_VALID, + IS_JUDGED, + SEND_TIMES, + SUCCESS_TIMES, + FIRST_SEND_TIME, + LAST_SEND_TIME, + CREATE_TIME, + LAST_UPDATE, + IS_DELETED, + RULE_NAME, + RCP_HIT_COUNT, + TOTAL_PACKET_NUM, + TOTAL_BYTE_NUM, + EFFECTIVE_EQUIPMENT_NUM, + AVERAGE_LATENCY, + MAX_LATENCY, + MIN_LATENCY, + c2s_pkt_num, + s2c_pkt_num, + c2s_byte_num, + s2c_byte_num, + display_id, + session_num, + first_effect_time, + last_rcp_query_time, + last_traffic_query_time, + log_uuid, + sip_int, + dip_int, + command_status + ) + select + NOW(), + NULL, + TASK_ID, + RULE_ID, + COMMAND_ID, + TASKTYPE, + ADDR_TYPE, + SRC_IP, + SRC_PORT, + DST_IP, + DST_PORT, + PROTOCOL, + MASK_SRC_IP, + MASK_SRC_PORT, + MASK_DST_IP, + MASK_DST_PORT, + MASK_PROTOCOL, + TASK_ACT, + EVENTTYPE, + TASKNAME, + DISTRIBUTEPOINT, + DEPARTMENT, + FREQUENCY, + VALID_TIME, + INVALID_TIME, + IS_VALID, + IS_JUDGED, + SEND_TIMES, + SUCCESS_TIMES, + FIRST_SEND_TIME, + LAST_SEND_TIME, + CREATE_TIME, + LAST_UPDATE, + IS_DELETED, + RULE_NAME, + RCP_HIT_COUNT, + TOTAL_PACKET_NUM, + TOTAL_BYTE_NUM, + EFFECTIVE_EQUIPMENT_NUM, + AVERAGE_LATENCY, + MAX_LATENCY, + MIN_LATENCY, + c2s_pkt_num, + s2c_pkt_num, + c2s_byte_num, + s2c_byte_num, + display_id, + session_num, + first_effect_time, + last_rcp_query_time, + last_traffic_query_time, + #{log_id}, + sip_int, + dip_int, + #{command_status} + from t_command + where COMMAND_ID = #{command_id} + @@ -457,6 +575,7 @@ + @@ -717,4 +836,48 @@ AND IS_VALID = 1 AND IS_DELETED = 0 + + + + + + + diff --git a/src/main/resources/mappers/TaskMapper.xml b/src/main/resources/mappers/TaskMapper.xml index 1b1501a..bc114d5 100644 --- a/src/main/resources/mappers/TaskMapper.xml +++ b/src/main/resources/mappers/TaskMapper.xml @@ -772,4 +772,14 @@ SELECT COUNT(*) FROM t_task_status_log WHERE task_id = #{id} + + \ No newline at end of file