diff --git a/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java b/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java index 0d8c225..d46ed61 100644 --- a/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java +++ b/src/main/java/com/realtime/protection/configuration/entity/task/TaskCommandInfo.java @@ -131,6 +131,14 @@ public class TaskCommandInfo { private Long totalPacketNum; + private int commandStatus; + + private LocalDateTime lastTrafficQueryTime; + + private LocalDateTime lastRCPQueryTime; + + private String logUUID; + // private String hashValue; diff --git a/src/main/java/com/realtime/protection/configuration/utils/enums/CommandStatusEnum.java b/src/main/java/com/realtime/protection/configuration/utils/enums/CommandStatusEnum.java index 293a887..7e7ec43 100644 --- a/src/main/java/com/realtime/protection/configuration/utils/enums/CommandStatusEnum.java +++ b/src/main/java/com/realtime/protection/configuration/utils/enums/CommandStatusEnum.java @@ -8,19 +8,20 @@ import java.util.Map; @Getter public enum CommandStatusEnum { START(1),//指令生成(研判后任务需要研判,其他任务可直接下发) - SEND(2),//(已研判)下发 + SEND(2),// 下发,发送c3 RCPHIT(3),//.. FLOWHIT(4),//.. UNRCPHIT(5),//.. UNFLOWHIT(6),//.. IGNORE(7),//本次忽略 WHOLEIGNORE(8),//全部忽略 - SUCCESS(9),//下发成功 - FAIL(10),//下发失败 - STOP(11), - PAUSE(12), - CANCEL(13),//撤销 - END(14);//任务结束 + JUDGED(9),// 研判通过 + SUCCESS(10),//下发成功 + FAIL(11),//下发失败 + STOP(12), + PAUSE(13), + CANCEL(14),//撤销 + END(0);//任务结束 private final int commandStatusNum; private static final Map map = new HashMap<>(); diff --git a/src/main/java/com/realtime/protection/server/command/CommandMapper.java b/src/main/java/com/realtime/protection/server/command/CommandMapper.java index 27e493a..54002ad 100644 --- a/src/main/java/com/realtime/protection/server/command/CommandMapper.java +++ b/src/main/java/com/realtime/protection/server/command/CommandMapper.java @@ -7,6 +7,7 @@ import com.realtime.protection.configuration.entity.whitelist.WhiteListObject; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; +import java.time.LocalDateTime; import java.util.List; @Mapper @@ -86,8 +87,6 @@ public interface CommandMapper { List queryRunningCommandsDistributeStatusByTaskId(Long taskId); - List queryRunningCommandsRcpHitCount(); - Integer queryCommandLogRcpHitCountByCommandId(@Param("command_id")String commandId); void insertCommandHistoryWithStatus(@Param("command_id")String commandUUID, @@ -99,4 +98,13 @@ public interface CommandMapper { List queryRunningCommandsTotalPacketNum(); Integer queryCommandLogTotalPacketNumByCommandId(@Param("command_id")String uuid); + + List queryCommandLogLastTwoRecord(@Param("command_id")String uuid); + + List queryRunningCommands(); + + void insertCommandHistoryWithStatusWithTime(@Param("command_id")String commandUUID, + @Param("log_id")String logId, + @Param("command_status")Integer commandStatus, + @Param("effective_time")LocalDateTime lastTrafficQueryTime); } diff --git a/src/main/java/com/realtime/protection/server/command/CommandService.java b/src/main/java/com/realtime/protection/server/command/CommandService.java index 7eee0a9..4967434 100644 --- a/src/main/java/com/realtime/protection/server/command/CommandService.java +++ b/src/main/java/com/realtime/protection/server/command/CommandService.java @@ -265,7 +265,7 @@ public class CommandService { //研判状态也写入历史表 switch (isJudged){ case 1: - insertCommandHistory(commandId, CommandStatusEnum.SEND.getCommandStatusNum()); + insertCommandHistory(commandId, CommandStatusEnum.JUDGED.getCommandStatusNum()); break; case 2: insertCommandHistory(commandId, CommandStatusEnum.IGNORE.getCommandStatusNum()); @@ -351,8 +351,8 @@ public class CommandService { } - public List queryRunningCommandsRcpHitCount() { - return commandMapper.queryRunningCommandsRcpHitCount(); + public List queryRunningCommands() { + return commandMapper.queryRunningCommands(); } public Integer queryCommandLogRcpHitCountByCommandId(String commandI) { @@ -366,4 +366,13 @@ public class CommandService { public Integer queryCommandLogTotalPacketNumByCommandId(String uuid) { return commandMapper.queryCommandLogTotalPacketNumByCommandId(uuid); } + + public List queryCommandLogLastTwoRecord(String uuid) { + return commandMapper.queryCommandLogLastTwoRecord(uuid); + } + + public void insertCommandHistoryWithTime(String commandUUID, Integer commandStatus, LocalDateTime lastTrafficQueryTime) { + String logId = UUID.randomUUID().toString(); + commandMapper.insertCommandHistoryWithStatusWithTime(commandUUID, logId, commandStatus, lastTrafficQueryTime); + } } diff --git a/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java b/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java index 8d688ba..8abced2 100644 --- a/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java +++ b/src/main/java/com/realtime/protection/server/task/status/StateChangeService.java @@ -157,7 +157,7 @@ public class StateChangeService { continue; } //判断是否全为成功 - if (commandStatusList.stream().allMatch(status -> status == null || status == 1)) { + if (commandStatusList.stream().allMatch(status -> status == null || status == 0)) { //全为成功 try { changeState(StateEnum.RUNNING_SUCCESS.getStateNum(), taskId, true); @@ -165,7 +165,7 @@ public class StateChangeService { log.warn(String.format("任务%d从%s状态变为运行中RUNNING_SUCCESS状态遭遇异常:%s", taskId, taskService.queryTaskStatus(taskId), e.getMessage())); } - } else if (commandStatusList.stream().anyMatch(status -> status == null || status == 1)) { + } else if (commandStatusList.stream().anyMatch(status -> status == null || status == 0)) { //部分成功 try { changeState(StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum(), taskId, true); @@ -187,87 +187,113 @@ public class StateChangeService { /** - * 更新指令rcp命中状态 + * 更新每次指令下周内rcp\flow命中状态 */ - @Scheduled(cron = "0 1/5 * * * ?") + @Scheduled(cron = "5/20 * * * * ?") @Async protected void updateCommandRCPHitStatus() { //获取所有正在下发的指令 - List commandRcpHitCountList = commandService.queryRunningCommandsRcpHitCount(); - log.info("rcp命中状态更新:正在下发的指令数量:{}", commandRcpHitCountList.size()); + List runningCommandList = commandService.queryRunningCommands(); + log.info("rcp命中状态更新:正在下发的指令数量:{}", runningCommandList.size()); + + //遍历所有指令 - for (TaskCommandInfo taskCommandInfo : commandRcpHitCountList) { - //查询当前指令的rcp-hitcount字段 - Long rcpHitCount = taskCommandInfo.getRcpHitCount(); - //查询当前历史表中该指令上一条日志的rcp-hitcount字段 - Integer lastRcpHitCount = commandService.queryCommandLogRcpHitCountByCommandId(taskCommandInfo.getUUID()); - //比较是否有变化 - if (rcpHitCount == null ) { - //更新log表,rcp没命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNRCPHIT.getCommandStatusNum()); + for (TaskCommandInfo taskCommandInfo : runningCommandList) { + //查询状态表,查看本次下发周期内是否已经记录了命中状态。查询最近的两条日志记录 + List commandLogs = commandService.queryCommandLogLastTwoRecord(taskCommandInfo.getUUID()); + if (commandLogs.size()<2){ continue; } - if (lastRcpHitCount == null){ - //更新log表,rcp命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.RCPHIT.getCommandStatusNum()); + TaskCommandInfo commandLogLatest = commandLogs.get(0); + TaskCommandInfo commandLogLatest2 = commandLogs.get(1); + log.info("指令历史状态:{}", commandLogs); + + + //如果日志状态为SEND、SUCCESS 或者 SEND、FAIL,则要根据当前t_command表的最新记录,和上次下发状态的命中计数相比判断是否有变化 + if (commandLogLatest2.getCommandStatus()==CommandStatusEnum.SEND.getCommandStatusNum() + && (commandLogLatest.getCommandStatus()==CommandStatusEnum.SUCCESS.getCommandStatusNum() + || commandLogLatest.getCommandStatus()==CommandStatusEnum.FAIL.getCommandStatusNum())) + { + log.info("rcp命中查询:{},{}",taskCommandInfo.getRcpHitCount(), commandLogLatest.getRcpHitCount()); + if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()==null) { + //更新log表,rcp命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime()); + } + if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()!=null) { + if (taskCommandInfo.getRcpHitCount() > commandLogLatest.getRcpHitCount()) { + //更新log表,rcp命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime()); + } + } + + + log.info("流量命中查询:{},{}",taskCommandInfo.getTotalPacketNum(), commandLogLatest.getTotalPacketNum()); + if (taskCommandInfo.getTotalPacketNum()==null && commandLogLatest.getTotalPacketNum()!=null) { + //更新log表,流量命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime()); + } + if (taskCommandInfo.getTotalPacketNum()!=null && commandLogLatest.getTotalPacketNum()!=null) { + if (taskCommandInfo.getTotalPacketNum() > commandLogLatest.getTotalPacketNum()) { + //更新log表,流量命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime()); + } + } + continue; } - if (rcpHitCount > lastRcpHitCount) { - //更新log表,rcp命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.RCPHIT.getCommandStatusNum()); - } else { - //更新log表,rcp没命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNRCPHIT.getCommandStatusNum()); - } + //如果日志状态为SUCCESS/FAIL、RCPHIT/UNRCPHIT,则判断是否更新流量命中状态 + if ((commandLogLatest2.getCommandStatus()==CommandStatusEnum.SUCCESS.getCommandStatusNum() + || commandLogLatest2.getCommandStatus()==CommandStatusEnum.FAIL.getCommandStatusNum()) + && (commandLogLatest.getCommandStatus()==CommandStatusEnum.RCPHIT.getCommandStatusNum() + || commandLogLatest.getCommandStatus()==CommandStatusEnum.UNRCPHIT.getCommandStatusNum())) + { + + log.info("流量命中查询:{},{}",taskCommandInfo.getTotalPacketNum(), commandLogLatest.getTotalPacketNum()); + if (taskCommandInfo.getTotalPacketNum()==null && commandLogLatest.getTotalPacketNum()!=null) { + //更新log表,流量命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime()); + } + if (taskCommandInfo.getTotalPacketNum()!=null && commandLogLatest.getTotalPacketNum()!=null) { + if (taskCommandInfo.getTotalPacketNum() > commandLogLatest.getTotalPacketNum()) { + //更新log表,流量命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime()); + } + } + continue; + } + //如果日志状态为SUCCESS/FAIL、FLOWHIT/UNFLOWHIT,则判断是否更新rcp命中状态 + if ((commandLogLatest2.getCommandStatus()==CommandStatusEnum.SUCCESS.getCommandStatusNum() + || commandLogLatest2.getCommandStatus()==CommandStatusEnum.FAIL.getCommandStatusNum()) + && (commandLogLatest.getCommandStatus()==CommandStatusEnum.FLOWHIT.getCommandStatusNum() + || commandLogLatest.getCommandStatus()==CommandStatusEnum.UNFLOWHIT.getCommandStatusNum())) + { + log.info("rcp命中查询:{},{}",taskCommandInfo.getRcpHitCount(), commandLogLatest.getRcpHitCount()); + if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()==null) { + //更新log表,rcp命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime()); + } + if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()!=null) { + if (taskCommandInfo.getRcpHitCount() > commandLogLatest.getRcpHitCount()) { + //更新log表,rcp命中 + commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(), + CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime()); + } + } + continue; + } + //如果日志状态为RCPHIT/FLOWHIT、FLOWHIT/UNFLOWHIT,则不需要查询了 } } - /** - * 更新指令rcp命中状态 - */ - @Scheduled(cron = "0 3/5 * * * ?") - @Async - protected void updateCommandTrafficHitStatus() { - //查询当前历史表中该指令上一条日志的统计流量大小字段 - - //比较是否有变化 - - //更新log表,流量命中、流量未命中 - //获取所有正在下发的指令 - List commandTotalPacketNumList = commandService.queryRunningCommandsTotalPacketNum(); - log.info("流量命中状态:查询正在下发的指令数量:{}", commandTotalPacketNumList.size()); - //遍历所有指令 - for (TaskCommandInfo taskCommandInfo : commandTotalPacketNumList) { - //查询当前指令的统计流量大小字段 - Long totalPacketNum = taskCommandInfo.getTotalPacketNum(); - //查询当前历史表中该指令上一条日志的rcp-hitcount字段 - Integer lastTotalPacketNum = commandService.queryCommandLogTotalPacketNumByCommandId(taskCommandInfo.getUUID()); - //比较是否有变化 - if (totalPacketNum == null ) { - //更新log表,rcp没命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNFLOWHIT.getCommandStatusNum()); - continue; - } - if (lastTotalPacketNum == null){ - //更新log表,rcp命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.FLOWHIT.getCommandStatusNum()); - continue; - } - - if (totalPacketNum > lastTotalPacketNum) { - //更新log表,rcp命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.FLOWHIT.getCommandStatusNum()); - } else { - //更新log表,rcp没命中 - commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNFLOWHIT.getCommandStatusNum()); - } - - } - - } - } diff --git a/src/main/java/com/realtime/protection/server/task/status/StateHandler.java b/src/main/java/com/realtime/protection/server/task/status/StateHandler.java index 55161cd..6b30b7e 100644 --- a/src/main/java/com/realtime/protection/server/task/status/StateHandler.java +++ b/src/main/java/com/realtime/protection/server/task/status/StateHandler.java @@ -277,20 +277,38 @@ public class StateHandler { }) .doOnError(WebClientResponseException.class, res -> success.set(false)); - - Map response = mono.block(Duration.ofSeconds(5)); - - if (response == null) { - return false; - } - response.forEach((commandUUID, responseCode) -> { - log.info("指令首次下发成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); - if (responseCode != 0) { - log.warn("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); - } - }); - - success.set(true); + // 异步处理响应 + mono.subscribe( + response2 -> { + Map response = response2; + success.set(false); + response.forEach((commandUUID, responseCode) -> { + log.info("指令首次下发成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); + if (responseCode != 0) { + log.warn("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); + } + }); + success.set(true); + }, + error -> { + // 错误响应处理 + System.err.println("指令首次下发错误: " + error.getMessage()); + success.set(false); + } + ); +// Map response = mono.block(Duration.ofSeconds(5)); +// +// if (response == null) { +// return false; +// } +// response.forEach((commandUUID, responseCode) -> { +// log.info("指令首次下发成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); +// if (responseCode != 0) { +// log.warn("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); +// } +// }); +// +// success.set(true); return success.get(); } @@ -335,16 +353,6 @@ public class StateHandler { } ); -// if (response == null) { -// log.info("指令首次查询RCP返回为null"); -// return false; -// } -// response.forEach((commandUUID, responseCode) -> { -// log.info("指令首次查询RCP成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); -// if (responseCode != 0) { -// log.warn("指令首次查询RCP失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode); -// } -// }); success.set(true); diff --git a/src/main/resources/mappers/CommandMapper.xml b/src/main/resources/mappers/CommandMapper.xml index e7b00bc..802e25e 100644 --- a/src/main/resources/mappers/CommandMapper.xml +++ b/src/main/resources/mappers/CommandMapper.xml @@ -204,6 +204,7 @@ insert into t_command_log( + log_uuid, effective_time, expire_time, TASK_ID, @@ -255,11 +256,11 @@ first_effect_time, last_rcp_query_time, last_traffic_query_time, - log_uuid, sip_int, dip_int ) select + #{log_id}, NOW(), NULL, TASK_ID, @@ -311,7 +312,7 @@ first_effect_time, last_rcp_query_time, last_traffic_query_time, - #{log_id}, + sip_int, dip_int from t_command @@ -319,6 +320,7 @@ insert into t_command_log( + log_uuid, effective_time, expire_time, TASK_ID, @@ -370,12 +372,12 @@ first_effect_time, last_rcp_query_time, last_traffic_query_time, - log_uuid, sip_int, dip_int, command_status ) select + COMMAND_ID, NOW(), NULL, TASK_ID, @@ -427,7 +429,6 @@ first_effect_time, last_rcp_query_time, last_traffic_query_time, - COMMAND_ID, sip_int, dip_int, #{command_status} @@ -447,6 +448,7 @@ insert into t_command_log( + log_uuid, effective_time, expire_time, TASK_ID, @@ -498,12 +500,12 @@ first_effect_time, last_rcp_query_time, last_traffic_query_time, - log_uuid, sip_int, dip_int, command_status ) select + #{log_id}, NOW(), NULL, TASK_ID, @@ -555,7 +557,6 @@ first_effect_time, last_rcp_query_time, last_traffic_query_time, - #{log_id}, sip_int, dip_int, #{command_status} @@ -563,7 +564,6 @@ where COMMAND_ID = #{command_id} - @@ -577,6 +577,12 @@ + + + + + + @@ -880,4 +886,171 @@ ORDER BY effective_time DESC LIMIT 1 + + + + + + + insert into t_command_log( + log_uuid, + effective_time, + expire_time, + TASK_ID, + RULE_ID, + COMMAND_ID, + TASKTYPE, + ADDR_TYPE, + SRC_IP, + SRC_PORT, + DST_IP, + DST_PORT, + PROTOCOL, + MASK_SRC_IP, + MASK_SRC_PORT, + MASK_DST_IP, + MASK_DST_PORT, + MASK_PROTOCOL, + TASK_ACT, + EVENTTYPE, + TASKNAME, + DISTRIBUTEPOINT, + DEPARTMENT, + FREQUENCY, + VALID_TIME, + INVALID_TIME, + IS_VALID, + IS_JUDGED, + SEND_TIMES, + SUCCESS_TIMES, + FIRST_SEND_TIME, + LAST_SEND_TIME, + CREATE_TIME, + LAST_UPDATE, + IS_DELETED, + RULE_NAME, + RCP_HIT_COUNT, + TOTAL_PACKET_NUM, + TOTAL_BYTE_NUM, + EFFECTIVE_EQUIPMENT_NUM, + AVERAGE_LATENCY, + MAX_LATENCY, + MIN_LATENCY, + c2s_pkt_num, + s2c_pkt_num, + c2s_byte_num, + s2c_byte_num, + display_id, + session_num, + first_effect_time, + last_rcp_query_time, + last_traffic_query_time, + sip_int, + dip_int, + command_status + ) + select + #{log_id}, + #{effective_time}, + NULL, + TASK_ID, + RULE_ID, + COMMAND_ID, + TASKTYPE, + ADDR_TYPE, + SRC_IP, + SRC_PORT, + DST_IP, + DST_PORT, + PROTOCOL, + MASK_SRC_IP, + MASK_SRC_PORT, + MASK_DST_IP, + MASK_DST_PORT, + MASK_PROTOCOL, + TASK_ACT, + EVENTTYPE, + TASKNAME, + DISTRIBUTEPOINT, + DEPARTMENT, + FREQUENCY, + VALID_TIME, + INVALID_TIME, + IS_VALID, + IS_JUDGED, + SEND_TIMES, + SUCCESS_TIMES, + FIRST_SEND_TIME, + LAST_SEND_TIME, + CREATE_TIME, + LAST_UPDATE, + IS_DELETED, + RULE_NAME, + RCP_HIT_COUNT, + TOTAL_PACKET_NUM, + TOTAL_BYTE_NUM, + EFFECTIVE_EQUIPMENT_NUM, + AVERAGE_LATENCY, + MAX_LATENCY, + MIN_LATENCY, + c2s_pkt_num, + s2c_pkt_num, + c2s_byte_num, + s2c_byte_num, + display_id, + session_num, + first_effect_time, + last_rcp_query_time, + last_traffic_query_time, + sip_int, + dip_int, + #{command_status} + from t_command + where COMMAND_ID = #{command_id} + + + + + + + + + + + + + + + + + + + + + + + + +