指令状态获取方式修改
This commit is contained in:
PushM
2024-08-23 02:43:52 +08:00
parent f6883318e7
commit 4e9c4af247
7 changed files with 345 additions and 112 deletions

View File

@@ -131,6 +131,14 @@ public class TaskCommandInfo {
private Long totalPacketNum;
private int commandStatus;
private LocalDateTime lastTrafficQueryTime;
private LocalDateTime lastRCPQueryTime;
private String logUUID;
// private String hashValue;

View File

@@ -8,19 +8,20 @@ import java.util.Map;
@Getter
public enum CommandStatusEnum {
START(1),//指令生成(研判后任务需要研判,其他任务可直接下发)
SEND(2),//(已研判)下发
SEND(2),// 下发,发送c3
RCPHIT(3),//..
FLOWHIT(4),//..
UNRCPHIT(5),//..
UNFLOWHIT(6),//..
IGNORE(7),//本次忽略
WHOLEIGNORE(8),//全部忽略
SUCCESS(9),//下发成功
FAIL(10),//下发失败
STOP(11),
PAUSE(12),
CANCEL(13),//撤销
END(14);//任务结束
JUDGED(9),// 研判通过
SUCCESS(10),//下发成功
FAIL(11),//下发失败
STOP(12),
PAUSE(13),
CANCEL(14),//撤销
END(0);//任务结束
private final int commandStatusNum;
private static final Map<Integer, CommandStatusEnum> map = new HashMap<>();

View File

@@ -7,6 +7,7 @@ import com.realtime.protection.configuration.entity.whitelist.WhiteListObject;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
import java.util.List;
@Mapper
@@ -86,8 +87,6 @@ public interface CommandMapper {
List<Integer> queryRunningCommandsDistributeStatusByTaskId(Long taskId);
List<TaskCommandInfo> queryRunningCommandsRcpHitCount();
Integer queryCommandLogRcpHitCountByCommandId(@Param("command_id")String commandId);
void insertCommandHistoryWithStatus(@Param("command_id")String commandUUID,
@@ -99,4 +98,13 @@ public interface CommandMapper {
List<TaskCommandInfo> queryRunningCommandsTotalPacketNum();
Integer queryCommandLogTotalPacketNumByCommandId(@Param("command_id")String uuid);
List<TaskCommandInfo> queryCommandLogLastTwoRecord(@Param("command_id")String uuid);
List<TaskCommandInfo> queryRunningCommands();
void insertCommandHistoryWithStatusWithTime(@Param("command_id")String commandUUID,
@Param("log_id")String logId,
@Param("command_status")Integer commandStatus,
@Param("effective_time")LocalDateTime lastTrafficQueryTime);
}

View File

@@ -265,7 +265,7 @@ public class CommandService {
//研判状态也写入历史表
switch (isJudged){
case 1:
insertCommandHistory(commandId, CommandStatusEnum.SEND.getCommandStatusNum());
insertCommandHistory(commandId, CommandStatusEnum.JUDGED.getCommandStatusNum());
break;
case 2:
insertCommandHistory(commandId, CommandStatusEnum.IGNORE.getCommandStatusNum());
@@ -351,8 +351,8 @@ public class CommandService {
}
public List<TaskCommandInfo> queryRunningCommandsRcpHitCount() {
return commandMapper.queryRunningCommandsRcpHitCount();
public List<TaskCommandInfo> queryRunningCommands() {
return commandMapper.queryRunningCommands();
}
public Integer queryCommandLogRcpHitCountByCommandId(String commandI) {
@@ -366,4 +366,13 @@ public class CommandService {
public Integer queryCommandLogTotalPacketNumByCommandId(String uuid) {
return commandMapper.queryCommandLogTotalPacketNumByCommandId(uuid);
}
public List<TaskCommandInfo> queryCommandLogLastTwoRecord(String uuid) {
return commandMapper.queryCommandLogLastTwoRecord(uuid);
}
public void insertCommandHistoryWithTime(String commandUUID, Integer commandStatus, LocalDateTime lastTrafficQueryTime) {
String logId = UUID.randomUUID().toString();
commandMapper.insertCommandHistoryWithStatusWithTime(commandUUID, logId, commandStatus, lastTrafficQueryTime);
}
}

View File

@@ -157,7 +157,7 @@ public class StateChangeService {
continue;
}
//判断是否全为成功
if (commandStatusList.stream().allMatch(status -> status == null || status == 1)) {
if (commandStatusList.stream().allMatch(status -> status == null || status == 0)) {
//全为成功
try {
changeState(StateEnum.RUNNING_SUCCESS.getStateNum(), taskId, true);
@@ -165,7 +165,7 @@ public class StateChangeService {
log.warn(String.format("任务%d从%s状态变为运行中RUNNING_SUCCESS状态遭遇异常%s",
taskId, taskService.queryTaskStatus(taskId), e.getMessage()));
}
} else if (commandStatusList.stream().anyMatch(status -> status == null || status == 1)) {
} else if (commandStatusList.stream().anyMatch(status -> status == null || status == 0)) {
//部分成功
try {
changeState(StateEnum.RUNNING_PARTIAL_SUCCESS.getStateNum(), taskId, true);
@@ -187,87 +187,113 @@ public class StateChangeService {
/**
* 更新指令rcp命中状态
* 更新每次指令下周内rcp\flow命中状态
*/
@Scheduled(cron = "0 1/5 * * * ?")
@Scheduled(cron = "5/20 * * * * ?")
@Async
protected void updateCommandRCPHitStatus() {
//获取所有正在下发的指令
List<TaskCommandInfo> commandRcpHitCountList = commandService.queryRunningCommandsRcpHitCount();
log.info("rcp命中状态更新正在下发的指令数量{}", commandRcpHitCountList.size());
List<TaskCommandInfo> runningCommandList = commandService.queryRunningCommands();
log.info("rcp命中状态更新正在下发的指令数量{}", runningCommandList.size());
//遍历所有指令
for (TaskCommandInfo taskCommandInfo : commandRcpHitCountList) {
//查询当前指令的rcp-hitcount字段
Long rcpHitCount = taskCommandInfo.getRcpHitCount();
//查询当前历史表中该指令上一条日志的rcp-hitcount字段
Integer lastRcpHitCount = commandService.queryCommandLogRcpHitCountByCommandId(taskCommandInfo.getUUID());
//比较是否有变化
if (rcpHitCount == null ) {
//更新log表rcp没命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNRCPHIT.getCommandStatusNum());
for (TaskCommandInfo taskCommandInfo : runningCommandList) {
//查询状态表,查看本次下发周期内是否已经记录了命中状态。查询最近的两条日志记录
List<TaskCommandInfo> commandLogs = commandService.queryCommandLogLastTwoRecord(taskCommandInfo.getUUID());
if (commandLogs.size()<2){
continue;
}
if (lastRcpHitCount == null){
//更新log表rcp命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.RCPHIT.getCommandStatusNum());
TaskCommandInfo commandLogLatest = commandLogs.get(0);
TaskCommandInfo commandLogLatest2 = commandLogs.get(1);
log.info("指令历史状态:{}", commandLogs);
//如果日志状态为SEND、SUCCESS 或者 SEND、FAIL则要根据当前t_command表的最新记录和上次下发状态的命中计数相比判断是否有变化
if (commandLogLatest2.getCommandStatus()==CommandStatusEnum.SEND.getCommandStatusNum()
&& (commandLogLatest.getCommandStatus()==CommandStatusEnum.SUCCESS.getCommandStatusNum()
|| commandLogLatest.getCommandStatus()==CommandStatusEnum.FAIL.getCommandStatusNum()))
{
log.info("rcp命中查询{}{}",taskCommandInfo.getRcpHitCount(), commandLogLatest.getRcpHitCount());
if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()==null) {
//更新log表rcp命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime());
}
if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()!=null) {
if (taskCommandInfo.getRcpHitCount() > commandLogLatest.getRcpHitCount()) {
//更新log表rcp命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime());
}
}
log.info("流量命中查询:{}{}",taskCommandInfo.getTotalPacketNum(), commandLogLatest.getTotalPacketNum());
if (taskCommandInfo.getTotalPacketNum()==null && commandLogLatest.getTotalPacketNum()!=null) {
//更新log表流量命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime());
}
if (taskCommandInfo.getTotalPacketNum()!=null && commandLogLatest.getTotalPacketNum()!=null) {
if (taskCommandInfo.getTotalPacketNum() > commandLogLatest.getTotalPacketNum()) {
//更新log表流量命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime());
}
}
continue;
}
if (rcpHitCount > lastRcpHitCount) {
//更新log表rcp命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.RCPHIT.getCommandStatusNum());
} else {
//更新log表rcp没命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNRCPHIT.getCommandStatusNum());
}
//如果日志状态为SUCCESS/FAIL、RCPHIT/UNRCPHIT则判断是否更新流量命中状态
if ((commandLogLatest2.getCommandStatus()==CommandStatusEnum.SUCCESS.getCommandStatusNum()
|| commandLogLatest2.getCommandStatus()==CommandStatusEnum.FAIL.getCommandStatusNum())
&& (commandLogLatest.getCommandStatus()==CommandStatusEnum.RCPHIT.getCommandStatusNum()
|| commandLogLatest.getCommandStatus()==CommandStatusEnum.UNRCPHIT.getCommandStatusNum()))
{
log.info("流量命中查询:{}{}",taskCommandInfo.getTotalPacketNum(), commandLogLatest.getTotalPacketNum());
if (taskCommandInfo.getTotalPacketNum()==null && commandLogLatest.getTotalPacketNum()!=null) {
//更新log表流量命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime());
}
if (taskCommandInfo.getTotalPacketNum()!=null && commandLogLatest.getTotalPacketNum()!=null) {
if (taskCommandInfo.getTotalPacketNum() > commandLogLatest.getTotalPacketNum()) {
//更新log表流量命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.FLOWHIT.getCommandStatusNum(),taskCommandInfo.getLastTrafficQueryTime());
}
}
continue;
}
//如果日志状态为SUCCESS/FAIL、FLOWHIT/UNFLOWHIT则判断是否更新rcp命中状态
if ((commandLogLatest2.getCommandStatus()==CommandStatusEnum.SUCCESS.getCommandStatusNum()
|| commandLogLatest2.getCommandStatus()==CommandStatusEnum.FAIL.getCommandStatusNum())
&& (commandLogLatest.getCommandStatus()==CommandStatusEnum.FLOWHIT.getCommandStatusNum()
|| commandLogLatest.getCommandStatus()==CommandStatusEnum.UNFLOWHIT.getCommandStatusNum()))
{
log.info("rcp命中查询{}{}",taskCommandInfo.getRcpHitCount(), commandLogLatest.getRcpHitCount());
if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()==null) {
//更新log表rcp命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime());
}
if (taskCommandInfo.getRcpHitCount()!=null && commandLogLatest.getRcpHitCount()!=null) {
if (taskCommandInfo.getRcpHitCount() > commandLogLatest.getRcpHitCount()) {
//更新log表rcp命中
commandService.insertCommandHistoryWithTime(taskCommandInfo.getUUID(),
CommandStatusEnum.RCPHIT.getCommandStatusNum(), taskCommandInfo.getLastRCPQueryTime());
}
}
continue;
}
//如果日志状态为RCPHIT/FLOWHIT、FLOWHIT/UNFLOWHIT则不需要查询了
}
}
/**
* 更新指令rcp命中状态
*/
@Scheduled(cron = "0 3/5 * * * ?")
@Async
protected void updateCommandTrafficHitStatus() {
//查询当前历史表中该指令上一条日志的统计流量大小字段
//比较是否有变化
//更新log表流量命中、流量未命中
//获取所有正在下发的指令
List<TaskCommandInfo> commandTotalPacketNumList = commandService.queryRunningCommandsTotalPacketNum();
log.info("流量命中状态:查询正在下发的指令数量:{}", commandTotalPacketNumList.size());
//遍历所有指令
for (TaskCommandInfo taskCommandInfo : commandTotalPacketNumList) {
//查询当前指令的统计流量大小字段
Long totalPacketNum = taskCommandInfo.getTotalPacketNum();
//查询当前历史表中该指令上一条日志的rcp-hitcount字段
Integer lastTotalPacketNum = commandService.queryCommandLogTotalPacketNumByCommandId(taskCommandInfo.getUUID());
//比较是否有变化
if (totalPacketNum == null ) {
//更新log表rcp没命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNFLOWHIT.getCommandStatusNum());
continue;
}
if (lastTotalPacketNum == null){
//更新log表rcp命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.FLOWHIT.getCommandStatusNum());
continue;
}
if (totalPacketNum > lastTotalPacketNum) {
//更新log表rcp命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.FLOWHIT.getCommandStatusNum());
} else {
//更新log表rcp没命中
commandService.insertCommandHistory(taskCommandInfo.getUUID(), CommandStatusEnum.UNFLOWHIT.getCommandStatusNum());
}
}
}
}

View File

@@ -277,20 +277,38 @@ public class StateHandler {
})
.doOnError(WebClientResponseException.class, res -> success.set(false));
Map<String, Integer> response = mono.block(Duration.ofSeconds(5));
if (response == null) {
return false;
}
response.forEach((commandUUID, responseCode) -> {
log.info("指令首次下发成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
if (responseCode != 0) {
log.warn("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
}
});
success.set(true);
// 异步处理响应
mono.subscribe(
response2 -> {
Map<String, Integer> response = response2;
success.set(false);
response.forEach((commandUUID, responseCode) -> {
log.info("指令首次下发成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
if (responseCode != 0) {
log.warn("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
}
});
success.set(true);
},
error -> {
// 错误响应处理
System.err.println("指令首次下发错误: " + error.getMessage());
success.set(false);
}
);
// Map<String, Integer> response = mono.block(Duration.ofSeconds(5));
//
// if (response == null) {
// return false;
// }
// response.forEach((commandUUID, responseCode) -> {
// log.info("指令首次下发成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
// if (responseCode != 0) {
// log.warn("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
// }
// });
//
// success.set(true);
return success.get();
}
@@ -335,16 +353,6 @@ public class StateHandler {
}
);
// if (response == null) {
// log.info("指令首次查询RCP返回为null");
// return false;
// }
// response.forEach((commandUUID, responseCode) -> {
// log.info("指令首次查询RCP成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
// if (responseCode != 0) {
// log.warn("指令首次查询RCP失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
// }
// });
success.set(true);

View File

@@ -204,6 +204,7 @@
<insert id="insertCommandHistory">
insert into t_command_log(
log_uuid,
effective_time,
expire_time,
TASK_ID,
@@ -255,11 +256,11 @@
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
log_uuid,
sip_int,
dip_int
)
select
#{log_id},
NOW(),
NULL,
TASK_ID,
@@ -311,7 +312,7 @@
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
#{log_id},
sip_int,
dip_int
from t_command
@@ -319,6 +320,7 @@
</insert>
<insert id="insertCommandHistoryBatch">
insert into t_command_log(
log_uuid,
effective_time,
expire_time,
TASK_ID,
@@ -370,12 +372,12 @@
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
log_uuid,
sip_int,
dip_int,
command_status
)
select
COMMAND_ID,
NOW(),
NULL,
TASK_ID,
@@ -427,7 +429,6 @@
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
COMMAND_ID,
sip_int,
dip_int,
#{command_status}
@@ -447,6 +448,7 @@
</insert>
<insert id="insertCommandHistoryWithStatus">
insert into t_command_log(
log_uuid,
effective_time,
expire_time,
TASK_ID,
@@ -498,12 +500,12 @@
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
log_uuid,
sip_int,
dip_int,
command_status
)
select
#{log_id},
NOW(),
NULL,
TASK_ID,
@@ -555,7 +557,6 @@
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
#{log_id},
sip_int,
dip_int,
#{command_status}
@@ -563,7 +564,6 @@
where COMMAND_ID = #{command_id}
</insert>
<resultMap id="commandStatMap" type="com.realtime.protection.configuration.entity.task.TaskCommandInfo">
<id column="COMMAND_ID" property="UUID"/>
<result column="TASK_ACT" property="taskAct"/>
@@ -577,6 +577,12 @@
<result column="RCP_HIT_COUNT" property="rcpHitCount"/>
<result column="TOTAL_PACKET_NUM" property="totalPacketNum"/>
<result column="command_status" property="commandStatus"/>
<result column="last_rcp_query_time" property="lastRCPQueryTime"/>
<result column="last_traffic_query_time" property="lastTrafficQueryTime"/>
<association property="fiveTupleWithMask">
<result column="SRC_IP" property="sourceIP"/>
@@ -880,4 +886,171 @@
ORDER BY effective_time DESC LIMIT 1
</select>
<insert id="insertCommandHistoryWithStatusWithTime">
insert into t_command_log(
log_uuid,
effective_time,
expire_time,
TASK_ID,
RULE_ID,
COMMAND_ID,
TASKTYPE,
ADDR_TYPE,
SRC_IP,
SRC_PORT,
DST_IP,
DST_PORT,
PROTOCOL,
MASK_SRC_IP,
MASK_SRC_PORT,
MASK_DST_IP,
MASK_DST_PORT,
MASK_PROTOCOL,
TASK_ACT,
EVENTTYPE,
TASKNAME,
DISTRIBUTEPOINT,
DEPARTMENT,
FREQUENCY,
VALID_TIME,
INVALID_TIME,
IS_VALID,
IS_JUDGED,
SEND_TIMES,
SUCCESS_TIMES,
FIRST_SEND_TIME,
LAST_SEND_TIME,
CREATE_TIME,
LAST_UPDATE,
IS_DELETED,
RULE_NAME,
RCP_HIT_COUNT,
TOTAL_PACKET_NUM,
TOTAL_BYTE_NUM,
EFFECTIVE_EQUIPMENT_NUM,
AVERAGE_LATENCY,
MAX_LATENCY,
MIN_LATENCY,
c2s_pkt_num,
s2c_pkt_num,
c2s_byte_num,
s2c_byte_num,
display_id,
session_num,
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
sip_int,
dip_int,
command_status
)
select
#{log_id},
#{effective_time},
NULL,
TASK_ID,
RULE_ID,
COMMAND_ID,
TASKTYPE,
ADDR_TYPE,
SRC_IP,
SRC_PORT,
DST_IP,
DST_PORT,
PROTOCOL,
MASK_SRC_IP,
MASK_SRC_PORT,
MASK_DST_IP,
MASK_DST_PORT,
MASK_PROTOCOL,
TASK_ACT,
EVENTTYPE,
TASKNAME,
DISTRIBUTEPOINT,
DEPARTMENT,
FREQUENCY,
VALID_TIME,
INVALID_TIME,
IS_VALID,
IS_JUDGED,
SEND_TIMES,
SUCCESS_TIMES,
FIRST_SEND_TIME,
LAST_SEND_TIME,
CREATE_TIME,
LAST_UPDATE,
IS_DELETED,
RULE_NAME,
RCP_HIT_COUNT,
TOTAL_PACKET_NUM,
TOTAL_BYTE_NUM,
EFFECTIVE_EQUIPMENT_NUM,
AVERAGE_LATENCY,
MAX_LATENCY,
MIN_LATENCY,
c2s_pkt_num,
s2c_pkt_num,
c2s_byte_num,
s2c_byte_num,
display_id,
session_num,
first_effect_time,
last_rcp_query_time,
last_traffic_query_time,
sip_int,
dip_int,
#{command_status}
from t_command
where COMMAND_ID = #{command_id}
</insert>
<resultMap id="commandLogMap" type="com.realtime.protection.configuration.entity.task.TaskCommandInfo">
<id column="log_uuid" property="logUUID"/>
<result column="COMMAND_ID" property="UUID"/>
<result column="RCP_HIT_COUNT" property="rcpHitCount"/>
<result column="TOTAL_PACKET_NUM" property="totalPacketNum"/>
<result column="command_status" property="commandStatus"/>
<result column="last_rcp_query_time" property="lastRCPQueryTime"/>
<result column="last_traffic_query_time" property="lastTrafficQueryTime"/>
</resultMap>
<select id="queryCommandLogLastTwoRecord"
resultMap="commandLogMap">
SELECT log_uuid,
COMMAND_ID,
TOTAL_PACKET_NUM,
RCP_HIT_COUNT,
command_status
FROM t_command_log
WHERE COMMAND_ID = #{command_id}
ORDER BY effective_time DESC LIMIT 2
</select>
<select id="queryRunningCommands"
resultMap="commandStatMap">
SELECT COMMAND_ID,
RCP_HIT_COUNT,
TOTAL_PACKET_NUM,
last_rcp_query_time,
last_traffic_query_time
FROM t_command
WHERE IS_JUDGED = 1
AND IS_VALID = 1
AND IS_DELETED = 0
</select>
</mapper>