fix: ASW-164 修复 job被取消未释放Environment资源
This commit is contained in:
@@ -1,26 +1,46 @@
|
||||
package net.geedge.asw.module.runner.controller;
|
||||
|
||||
import cn.hutool.http.HttpRequest;
|
||||
import cn.hutool.http.HttpResponse;
|
||||
import cn.hutool.http.Method;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.log.Log;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import net.geedge.asw.common.util.Constants;
|
||||
import net.geedge.asw.common.util.R;
|
||||
import net.geedge.asw.common.util.RCode;
|
||||
import net.geedge.asw.common.util.T;
|
||||
import net.geedge.asw.module.environment.entity.EnvironmentEntity;
|
||||
import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity;
|
||||
import net.geedge.asw.module.environment.service.IEnvironmentService;
|
||||
import net.geedge.asw.module.environment.service.IEnvironmentSessionService;
|
||||
import net.geedge.asw.module.runner.entity.JobEntity;
|
||||
import net.geedge.asw.module.runner.service.IJobService;
|
||||
import net.geedge.asw.module.runner.util.RunnerConstant;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/v1/workspace")
|
||||
public class JobController {
|
||||
private static final Log log = Log.get();
|
||||
|
||||
@Autowired
|
||||
private IJobService jobService;
|
||||
|
||||
@Autowired
|
||||
private IEnvironmentService environmentService;
|
||||
|
||||
@Autowired
|
||||
private IEnvironmentSessionService sessionService;
|
||||
|
||||
@GetMapping("/{workspaceId}/job/{id}")
|
||||
public R detail(@PathVariable("workspaceId") String workspaceId,
|
||||
@PathVariable("id") String id) {
|
||||
@@ -65,14 +85,59 @@ public class JobController {
|
||||
@RequestParam String ids) {
|
||||
T.VerifyUtil.is(ids).notEmpty();
|
||||
List<String> idList = Arrays.asList(ids.split(","));
|
||||
// TODO 其他处理
|
||||
for (String id : idList) {
|
||||
cancelJob(id);
|
||||
}
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
private void cancelJob(String id) {
|
||||
JobEntity job = jobService.getById(id);
|
||||
EnvironmentEntity environment = environmentService.getById(job.getEnvId());
|
||||
log.info("[cancelJob] [jobId: {}]", id);
|
||||
|
||||
JSONObject paramJSONObject = environment.getParamJSONObject();
|
||||
String url = paramJSONObject.getStr("url");
|
||||
String token = paramJSONObject.getStr("token");
|
||||
|
||||
HttpRequest requestStatus = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, id));
|
||||
requestStatus.header("Authorization", token);
|
||||
|
||||
if (job.getStatus().contains(RunnerConstant.JobStatus.RUNNING.getValue())){
|
||||
while (true){
|
||||
HttpResponse response = requestStatus.execute();
|
||||
if (response.isOk()){
|
||||
break;
|
||||
}
|
||||
}
|
||||
HttpRequest request = T.HttpUtil.createRequest(Method.DELETE, String.format("%s/api/v1/env/playbook/%s", url, id));
|
||||
request.header("Authorization", token);
|
||||
HttpResponse response = request.execute();
|
||||
log.info("[cancelJob] [request env stop playbook] [status: {}]", response.body());
|
||||
}
|
||||
|
||||
Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id);
|
||||
if (runningThread != null) {
|
||||
runningThread.interrupt();
|
||||
}
|
||||
|
||||
Thread resultThread = Constants.RESULT_JOB_THREAD.get(id);
|
||||
if (resultThread != null) {
|
||||
resultThread.interrupt();
|
||||
}
|
||||
EnvironmentSessionEntity session = sessionService.getOne(new LambdaQueryWrapper<EnvironmentSessionEntity>()
|
||||
.eq(EnvironmentSessionEntity::getJobId, id)
|
||||
.eq(EnvironmentSessionEntity::getStatus, 1));
|
||||
|
||||
if (T.ObjectUtil.isNotEmpty(session)) {
|
||||
environmentService.removeSession(session.getId());
|
||||
}
|
||||
|
||||
// update state
|
||||
jobService.update(new LambdaUpdateWrapper<JobEntity>()
|
||||
.in(JobEntity::getId, idList)
|
||||
.eq(JobEntity::getId, id)
|
||||
.set(JobEntity::getStatus, "cancel")
|
||||
);
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -132,6 +132,7 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
|
||||
Thread.sleep(2000); // 每 2 秒执行一次
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.info("[playbookExecResultChecker] [startGetJobLogThread] [stop thread] [job id: {}]", job.getId());
|
||||
Constants.RUNNING_JOB_THREAD.remove(job.getId());
|
||||
Thread.currentThread().interrupt(); // 恢复中断状态
|
||||
}
|
||||
@@ -212,6 +213,11 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
|
||||
zipFile = new ZipFile(destination);
|
||||
List<FileHeader> fileHeaders = zipFile.getFileHeaders();
|
||||
for (FileHeader fileHeader : fileHeaders) {
|
||||
// 检查中断状态
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
log.info("[playbookExecResultChecker] [startGetJobResultThread] [stop thread] [job id: {}]", job.getId());
|
||||
return; // 中断线程,退出循环
|
||||
}
|
||||
// 处理 pcap 文件
|
||||
if (fileHeader.getFileName().endsWith("pcap")) {
|
||||
PackageEntity packageEntity = packageService.getById(job.getPackageId());
|
||||
@@ -269,6 +275,9 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
|
||||
log.info("[playbookExecResultChecker] [startGetJobResultThread] [finshed] [job id: {}]", job.getId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (e.getMessage().contains("Closed by interrupt")) {
|
||||
Thread.currentThread().interrupt(); // 恢复中断状态
|
||||
}
|
||||
log.error("[playbookExecResultChecker] [startGetJobResultThread] [error]", e);
|
||||
} finally {
|
||||
T.IoUtil.close(zipFile);
|
||||
|
||||
Reference in New Issue
Block a user