1、新增com.alibaba:druid依赖
2、动态规则类新增sql解析方法,发送dynamicTaskInfos时对sql进行解析。 3、ResponseResult返回响应信息修改为中文 4、动态任务运行状态转变为运行中时,dynamicTaskInfos生成的查询mapper语句修改
This commit is contained in:
@@ -50,6 +50,8 @@ dependencies {
|
||||
implementation 'com.baomidou:dynamic-datasource-spring-boot3-starter:4.3.0'
|
||||
implementation 'com.github.xiaoymin:knife4j-openapi3-jakarta-spring-boot-starter:4.4.0'
|
||||
implementation 'com.squareup.okhttp3:okhttp:4.12.0'
|
||||
implementation 'com.alibaba:druid:1.2.23'
|
||||
|
||||
}
|
||||
|
||||
tasks.named('test') {
|
||||
|
||||
@@ -1,11 +1,21 @@
|
||||
package com.realtime.protection.configuration.entity.task;
|
||||
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelect;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
|
||||
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor;
|
||||
import com.alibaba.druid.sql.parser.SQLStatementParser;
|
||||
import com.alibaba.druid.stat.TableStat;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
public class DynamicTaskInfo {
|
||||
|
||||
@@ -45,10 +55,76 @@ public class DynamicTaskInfo {
|
||||
@JsonProperty("event_type")
|
||||
private String eventType;
|
||||
|
||||
@JsonProperty("log_rule_id")
|
||||
private Long logRuleId;
|
||||
// @JsonProperty("event_ids")
|
||||
// private List<Long> eventIds;
|
||||
//
|
||||
// @JsonProperty("c_inoutid")
|
||||
// private List<Long> inoutId;
|
||||
//
|
||||
// @JsonProperty("c_netnum")
|
||||
// private List<Long> netNum;
|
||||
|
||||
@JsonProperty("conditions")
|
||||
private HashMap<String,String> conditions = new HashMap<>();
|
||||
|
||||
// @JsonProperty("eq_condition")
|
||||
// private HashMap<String,String> eqConditions = new HashMap<>();;
|
||||
|
||||
@JsonProperty("table_name")
|
||||
private List<String> tableNames = new ArrayList<>();
|
||||
|
||||
@JsonProperty("bw_sql")
|
||||
private String bwSql;
|
||||
|
||||
@JsonProperty("select_columns")
|
||||
private List<String> selectColumns = new ArrayList<>();
|
||||
|
||||
// 从防护对象列表中获取
|
||||
@JsonProperty("protect_objects")
|
||||
private List<SimpleProtectObject> protectObjects;
|
||||
|
||||
public void parseSql() {
|
||||
String bwSql = this.bwSql;
|
||||
//解析SQL语句
|
||||
SQLStatementParser parser = new MySqlStatementParser(bwSql);
|
||||
// 使用Parser解析生成AST,这里SQLStatement就是AST
|
||||
SQLStatement sqlStatement = parser.parseStatement();
|
||||
|
||||
// 检查是否是SELECT语句
|
||||
if (sqlStatement instanceof SQLSelectStatement) {
|
||||
// 创建访问者
|
||||
MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
|
||||
sqlStatement.accept(visitor);
|
||||
|
||||
// 进一步细化,确保只获取SELECT子句的字段
|
||||
SQLSelectStatement selectStatement = (SQLSelectStatement) sqlStatement;
|
||||
SQLSelect select = selectStatement.getSelect();
|
||||
SQLSelectQueryBlock queryBlock = (SQLSelectQueryBlock) select.getQuery();
|
||||
|
||||
queryBlock.getSelectList().forEach(item -> this.selectColumns.add(item.toString()));
|
||||
|
||||
Map<TableStat.Name, TableStat> tables = visitor.getTables();
|
||||
for (TableStat.Name t : tables.keySet()) {
|
||||
this.tableNames.add(t.getName());
|
||||
}
|
||||
//获取where条件
|
||||
List<TableStat.Condition> conditions = visitor.getConditions();
|
||||
conditions.forEach(condition -> {
|
||||
// log.info("解析sql后的查询条件:{}", condition.getColumn().getName());
|
||||
if (condition.getOperator().equals("IN")){
|
||||
this.conditions.put(condition.getColumn().getName(),condition.getValues().toString());
|
||||
}
|
||||
if (condition.getOperator().equals("=")){
|
||||
this.conditions.put(condition.getColumn().getName(),condition.getValues().toString());
|
||||
}
|
||||
});
|
||||
// log.info("解析sql后的查询条件:{}",conditions);
|
||||
|
||||
} else {
|
||||
throw new IllegalArgumentException("不是SELECT查询语句,不进行字段解析。");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ public class ResponseResult implements Serializable {
|
||||
}
|
||||
|
||||
public static ResponseResult ok() {
|
||||
return new ResponseResult(200, "request succeed");
|
||||
return new ResponseResult(200, "请求成功");
|
||||
}
|
||||
|
||||
public static ResponseResult ok(String message) {
|
||||
@@ -49,11 +49,11 @@ public class ResponseResult implements Serializable {
|
||||
}
|
||||
|
||||
public static ResponseResult error() {
|
||||
return new ResponseResult(500, "request failed");
|
||||
return new ResponseResult(500, "请求失败");
|
||||
}
|
||||
|
||||
public static ResponseResult invalid() {
|
||||
return new ResponseResult(400, "invalid request");
|
||||
return new ResponseResult(400, "请求无效");
|
||||
}
|
||||
|
||||
public static ResponseResult invalid(String message) {
|
||||
@@ -61,7 +61,7 @@ public class ResponseResult implements Serializable {
|
||||
}
|
||||
|
||||
public static ResponseResult unAuthorized() {
|
||||
return new ResponseResult(401, "UnAuthorized User");
|
||||
return new ResponseResult(401, "未授权用户");
|
||||
}
|
||||
|
||||
public static ResponseResult error(String message) {
|
||||
|
||||
@@ -92,7 +92,7 @@ public class TaskService {
|
||||
//校验防护对象是否存在
|
||||
boolean ProtectObjIdValid = task.getProtectObjectIds().stream()
|
||||
.allMatch(dynamicRuleMapper::queryProtectObjectById);
|
||||
if (!ProtectObjIdValid) {
|
||||
if (!ProtectObjIdValid && !task.getProtectObjectIds().isEmpty()){
|
||||
throw new IllegalArgumentException("部分防护对象不存在");
|
||||
}
|
||||
//任务和防护对象多对多关联建立
|
||||
|
||||
@@ -8,11 +8,10 @@ import com.realtime.protection.configuration.utils.enums.TaskTypeEnum;
|
||||
import com.realtime.protection.configuration.utils.enums.audit.AuditStatusEnum;
|
||||
import com.realtime.protection.server.command.CommandService;
|
||||
import com.realtime.protection.server.task.TaskService;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
||||
import org.springframework.web.reactive.function.client.*;
|
||||
import reactor.core.publisher.Mono;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
@@ -114,12 +113,18 @@ public class StateHandler {
|
||||
if (dynamicTaskInfos == null || dynamicTaskInfos.isEmpty()) {
|
||||
throw new IllegalArgumentException("动态规则列表为空,请至少选择一个动态规则以启动动态/研判后类型任务");
|
||||
}
|
||||
//解析sql
|
||||
dynamicTaskInfos.forEach(dynamicTaskInfo -> {
|
||||
dynamicTaskInfo.parseSql();
|
||||
});
|
||||
|
||||
// 将所有关联的动态规则审批状态修改为“已使用”
|
||||
taskService.updateDynamicRuleAuditStatusInTask(task.getTaskId(), AuditStatusEnum.USING);
|
||||
|
||||
AtomicReference<Boolean> success = new AtomicReference<>(false);
|
||||
|
||||
// System.out.println(dynamicTaskInfos);
|
||||
|
||||
Mono<SimpleResponse> mono = client.post()
|
||||
.uri("/api/v1/kafkasend")
|
||||
.bodyValue(dynamicTaskInfos)
|
||||
@@ -144,3 +149,4 @@ public class StateHandler {
|
||||
return success.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
MASK_SRC_IP, MASK_SRC_PORT, MASK_DST_IP, MASK_DST_PORT, MASK_PROTOCOL, VALID_TIME,
|
||||
INVALID_TIME, IS_VALID, IS_JUDGED,
|
||||
SEND_TIMES, SUCCESS_TIMES, CREATE_TIME, LAST_UPDATE, IS_DELETED,
|
||||
TASK_TYPE, RULE_ID)
|
||||
TASKTYPE, RULE_ID)
|
||||
values (#{info.UUID}, #{info.taskId}, #{info.taskAct}, #{info.taskName}, #{info.eventType}, #{info.taskCreateDepart}, #{info.distributePoint},
|
||||
#{info.frequency},
|
||||
DEFAULT,
|
||||
@@ -32,7 +32,7 @@
|
||||
MASK_SRC_IP, MASK_SRC_PORT, MASK_DST_IP, MASK_DST_PORT, MASK_PROTOCOL, VALID_TIME, INVALID_TIME, IS_VALID,
|
||||
IS_JUDGED,
|
||||
SEND_TIMES, SUCCESS_TIMES, CREATE_TIME, LAST_UPDATE, IS_DELETED,
|
||||
TASK_TYPE, RULE_ID)
|
||||
TASKTYPE, RULE_ID)
|
||||
values
|
||||
<foreach collection="command_infos" item="info" separator=",">
|
||||
(#{info.UUID}, #{info.taskId}, #{info.taskAct}, #{info.taskName}, #{info.eventType}, #{info.taskCreateDepart}, #{info.distributePoint},
|
||||
|
||||
@@ -12,11 +12,12 @@
|
||||
(dynamic_rule_name,
|
||||
create_time, modify_time, dynamic_rule_create_username,
|
||||
dynamic_rule_create_depart,
|
||||
dynamic_rule_create_user_id, bw_sql, dynamic_rule_display_id)
|
||||
dynamic_rule_create_user_id, bw_sql, dynamic_rule_display_id, source_system)
|
||||
values (#{object.dynamicRuleName},
|
||||
NOW(), NOW(),
|
||||
#{object.dynamicRuleCreateUsername}, #{object.dynamicRuleCreateDepart},
|
||||
#{object.dynamicRuleCreateUserId}, #{object.bwSql}, #{object.dynamicRuleDisplayId})
|
||||
#{object.dynamicRuleCreateUserId}, #{object.bwSql}, #{object.dynamicRuleDisplayId}
|
||||
,#{object.dynamicRuleSourceSystem})
|
||||
</insert>
|
||||
|
||||
<!-- <insert id="newDynamicRulProtectObjectConcat">-->
|
||||
|
||||
@@ -388,7 +388,7 @@
|
||||
<result column="task_start_time" property="startTime"/>
|
||||
<result column="task_end_time" property="endTime"/>
|
||||
<result column="rule_id" property="ruleId"/>
|
||||
<result column="log_rule_id" property="logRuleId"/>
|
||||
<result column="bw_sql" property="bwSql"/>
|
||||
<result column="source_system" property="sourceSystem"/>
|
||||
<result column="event_type" property="eventType"/>
|
||||
<collection property="protectObjects"
|
||||
@@ -402,24 +402,24 @@
|
||||
|
||||
<select id="getDynamicTaskInfos"
|
||||
resultMap="dynamicTaskInfoMap">
|
||||
SELECT task_id,
|
||||
SELECT tt.task_id,
|
||||
task_start_time,
|
||||
task_end_time,
|
||||
tdr.dynamic_rule_id as rule_id,
|
||||
tst.strategy_template_source_system as source_system,
|
||||
tst.event_type as event_type,
|
||||
tdr.log_rule_id,
|
||||
tt.source_system as source_system,
|
||||
tt.event_type as event_type,
|
||||
tdr.bw_sql as bw_sql,
|
||||
INET_NTOA(protect_object_ip) as protect_object_ip,
|
||||
protect_object_port,
|
||||
protect_object_url,
|
||||
protect_object_protocol
|
||||
FROM t_task AS tt
|
||||
LEFT JOIN realtime_protection.t_dynamic_rule tdr on tt.task_id = tdr.dynamic_rule_used_task_id
|
||||
LEFT JOIN realtime_protection.t_protect_object_dynamic_rule_conn tpodrc
|
||||
on tdr.dynamic_rule_id = tpodrc.dynamic_rule_id
|
||||
LEFT JOIN realtime_protection.t_protect_object tpo on tpo.protect_object_id = tpodrc.protect_object_id
|
||||
LEFT JOIN realtime_protection.t_strategy_template_new tst on tdr.template_id = tst.strategy_template_id
|
||||
WHERE task_id = #{task_id}
|
||||
LEFT JOIN realtime_protection.t_task_protectobject_conn tconn
|
||||
on tt.task_id = tconn.task_id
|
||||
LEFT JOIN realtime_protection.t_protect_object tpo on tpo.protect_object_id = tconn.protect_object_id
|
||||
-- LEFT JOIN realtime_protection.t_strategy_template_new tst on tt.template_id = tst.strategy_template_id
|
||||
WHERE tt.task_id = #{task_id}
|
||||
</select>
|
||||
<select id="queryTaskTotalNum" resultType="java.lang.Integer">
|
||||
SELECT COUNT(*) FROM t_task
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.realtime.protection.ProtectionApplicationTests;
|
||||
import com.realtime.protection.configuration.entity.defense.object.ProtectObject;
|
||||
import com.realtime.protection.configuration.entity.defense.template.Template;
|
||||
import com.realtime.protection.configuration.entity.rule.dynamicrule.DynamicRuleObject;
|
||||
import com.realtime.protection.configuration.entity.task.DynamicTaskInfo;
|
||||
import com.realtime.protection.configuration.entity.task.Task;
|
||||
import com.realtime.protection.server.defense.object.ProtectObjectService;
|
||||
import com.realtime.protection.server.defense.template.TemplateService;
|
||||
@@ -158,4 +159,16 @@ public class DynamicRuleServiceTest extends ProtectionApplicationTests {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void testParseSql(){
|
||||
List<DynamicTaskInfo> dynamicTaskInfos = taskService.getDynamicTaskInfos(43991L);
|
||||
|
||||
dynamicTaskInfos.forEach(dynamicTaskInfo -> {
|
||||
dynamicTaskInfo.parseSql();
|
||||
System.out.println(dynamicTaskInfo.getConditions());
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.realtime.protection.server.task;
|
||||
|
||||
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
|
||||
import com.realtime.protection.ProtectionApplicationTests;
|
||||
import com.realtime.protection.configuration.entity.defense.object.ProtectObject;
|
||||
import com.realtime.protection.configuration.entity.defense.template.TemplateNew;
|
||||
@@ -226,7 +227,7 @@ class TaskServiceTest extends ProtectionApplicationTests {
|
||||
}
|
||||
|
||||
|
||||
@Transactional
|
||||
@DSTransactional
|
||||
@Test
|
||||
void testStartStaticTask() throws DorisStartException {
|
||||
|
||||
@@ -305,6 +306,9 @@ class TaskServiceTest extends ProtectionApplicationTests {
|
||||
object.setDynamicRuleName("testStartDynamicTask");
|
||||
object.setDynamicRuleFrequency(1);
|
||||
object.setDynamicRulePriority(1);
|
||||
object.setDynamicRuleSourceSystem("bw");
|
||||
// object.setDynamicRuleSourceSystem("bw");
|
||||
object.setBwSql("select c_src_ipv4,c_src_port,c_dest_ipv4,c_dest_port from topic_xxxxxx where c_event_id in (11113333311,222222222222) and c_netnum = 111111111 and c_flowid=22222222" );
|
||||
// object.setDynamicRuleRange("北京");
|
||||
// object.setDynamicRuleProtectLevel(1);
|
||||
// object.setLogRuleId(1L);
|
||||
@@ -316,31 +320,32 @@ class TaskServiceTest extends ProtectionApplicationTests {
|
||||
|
||||
|
||||
Task task = new Task();
|
||||
task.setTaskName("生产告警信息测试2");
|
||||
task.setTaskName("test动态任务sql");
|
||||
LocalDateTime taskStartTime = LocalDateTime.now().plusMinutes(1);
|
||||
LocalDateTime taskEndTime = LocalDateTime.now().plusYears(5);
|
||||
task.setTaskStartTime(taskStartTime);
|
||||
task.setTaskEndTime(taskEndTime);
|
||||
task.setTaskAct("阻断");
|
||||
task.setTaskAct("23");
|
||||
task.setTaskType(2);
|
||||
task.setTaskRange("1009");
|
||||
task.setTaskRange("1007");
|
||||
task.setTaskCreateUserId(1);
|
||||
task.setTaskCreateUsername("xxx");
|
||||
task.setTaskCreateDepart("xxx");
|
||||
task.setDynamicRuleIds(List.of(new Integer[]{dynamicRuleId}));
|
||||
task.setEventType("ddos");
|
||||
task.setProtectLevel(1);
|
||||
task.setEventType(templates.get(0).getEventType());
|
||||
task.setProtectLevel(Integer.valueOf(templates.get(0).getProtectLevel()));
|
||||
task.setTemplateName(templates.get(0).getTemplateName());
|
||||
task.setSourceSystem("bw");
|
||||
task.setSourceSystem(templates.get(0).getSourceSystem());
|
||||
task.setTemplateId(templates.get(0).getTemplateId());
|
||||
|
||||
task.setProtectObjectIds(List.of(new Integer[]{protectObject.get(0).getProtectObjectId()}));
|
||||
|
||||
Long taskId = taskService.newTask(task);
|
||||
System.out.println(taskId);
|
||||
//审核状态
|
||||
// taskService.changeTaskAuditStatus(taskId, 2);
|
||||
taskService.changeTaskAuditStatus(taskId, 2);
|
||||
//启动任务
|
||||
// stateChangeService.changeState(2, taskId, false);
|
||||
stateChangeService.changeState(2, taskId, false);
|
||||
|
||||
// System.out.println(commandService.queryCommandInfos(taskId, null, null, null, null, 1, 5));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user