fix: 修复 获取job 执行结果重复执行问题

This commit is contained in:
zhangshuai
2024-11-07 14:56:42 +08:00
parent bad7573f5d
commit 19090ae43d

View File

@@ -55,9 +55,9 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
private IPackageService packageService;
// 用于追踪虚拟线程,每个 Job 只对应一个虚拟线程
private final Map<String, Thread> runningJobThreads = new ConcurrentHashMap<>();
private List runningJobThreads = T.ListUtil.list(false);
private final Map<String, Thread> resultJobThreads = new ConcurrentHashMap<>();
private List resultJobThreads = T.ListUtil.list(false);
@Override
@@ -111,15 +111,24 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
switch (status){
case "running":
runningJobThreads.computeIfAbsent(job.getId(), jobId -> startJobVirtualThread(job, environment));
if (!runningJobThreads.contains(id)){
runningJobThreads.add(id);
startJobVirtualThread(job, environment);
}
break;
case "error":
resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment));
//this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment);
updateJobStatus(job, RunnerConstant.JobStatus.FAILED.getValue());
if (!resultJobThreads.contains(id)){
resultJobThreads.add(id);
getJobResult(job, environment);
}
break;
case "done":
resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment));
//this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment);
updateJobStatus(job, RunnerConstant.JobStatus.PASSED.getValue());
if (!resultJobThreads.contains(id)){
resultJobThreads.add(id);
getJobResult(job, environment);
}
break;
}
}
@@ -128,8 +137,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
}
}
private Thread startJobVirtualThread(JobEntity job, EnvironmentEntity environment) {
Thread virtualThread = Thread.ofVirtual().start(() -> {
private void startJobVirtualThread(JobEntity job, EnvironmentEntity environment) {
T.ThreadUtil.execAsync(() -> {
try {
while (true) {
if (isJobInRunningStatus(job)) {
@@ -146,7 +155,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
Thread.currentThread().interrupt(); // 恢复中断状态
}
});
return virtualThread;
}
// 检查 Job 的状态是否为 running
@@ -192,8 +200,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
@PreDestroy
public void shutdown() {
runningJobThreads.values().forEach(Thread::interrupt);
resultJobThreads.values().forEach(Thread::interrupt);
runningJobThreads.clear();
resultJobThreads.clear();
}
@@ -205,10 +211,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
* @param value job status: error and done
* @param environment
*/
private Thread getJobResult(JobEntity job, String value, EnvironmentEntity environment) {
Thread resultJobThread = Thread.ofVirtual().start(() -> {
log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value);
private void getJobResult(JobEntity job, EnvironmentEntity environment) {
T.ThreadUtil.execAsync(() -> {
File destination = null;
File pcapDestination = null;
InputStream inputStream = null;
@@ -246,6 +250,7 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
}
job.setPcapId(pcapEntity.getId());
jobService.updateById(job);
} else {
// log
inputStream = T.ZipUtil.get(zipFile, entry.getName());
@@ -253,9 +258,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
T.FileUtil.writeFromStream(inputStream, logFile);
}
}
job.setStatus(value);
job.setEndTimestamp(System.currentTimeMillis());
jobService.updateById(job);
}
} catch (Exception e) {
@@ -268,7 +270,12 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
resultJobThreads.remove(job.getId());
}
});
}
return resultJobThread;
private void updateJobStatus(JobEntity job, String value) {
log.info("[playbookExecResultChecker] [updateJobStatus] [jod id: {}] [status: {}]", job.getId(), value);
job.setStatus(value);
job.setEndTimestamp(System.currentTimeMillis());
jobService.updateById(job);
}
}