diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java index 00c0b73..5c848dd 100644 --- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java @@ -55,9 +55,9 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { private IPackageService packageService; // 用于追踪虚拟线程,每个 Job 只对应一个虚拟线程 - private final Map runningJobThreads = new ConcurrentHashMap<>(); + private List runningJobThreads = T.ListUtil.list(false); - private final Map resultJobThreads = new ConcurrentHashMap<>(); + private List resultJobThreads = T.ListUtil.list(false); @Override @@ -111,15 +111,24 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { switch (status){ case "running": - runningJobThreads.computeIfAbsent(job.getId(), jobId -> startJobVirtualThread(job, environment)); + if (!runningJobThreads.contains(id)){ + runningJobThreads.add(id); + startJobVirtualThread(job, environment); + } break; case "error": - resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment)); - //this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment); + updateJobStatus(job, RunnerConstant.JobStatus.FAILED.getValue()); + if (!resultJobThreads.contains(id)){ + resultJobThreads.add(id); + getJobResult(job, environment); + } break; case "done": - resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment)); - //this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment); + updateJobStatus(job, RunnerConstant.JobStatus.PASSED.getValue()); + if (!resultJobThreads.contains(id)){ + resultJobThreads.add(id); + getJobResult(job, environment); + } break; } } @@ -128,8 +137,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { } } - private Thread startJobVirtualThread(JobEntity job, EnvironmentEntity environment) { - Thread virtualThread = Thread.ofVirtual().start(() -> { + private void startJobVirtualThread(JobEntity job, EnvironmentEntity environment) { + T.ThreadUtil.execAsync(() -> { try { while (true) { if (isJobInRunningStatus(job)) { @@ -146,7 +155,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { Thread.currentThread().interrupt(); // 恢复中断状态 } }); - return virtualThread; } // 检查 Job 的状态是否为 running @@ -192,8 +200,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { @PreDestroy public void shutdown() { - runningJobThreads.values().forEach(Thread::interrupt); - resultJobThreads.values().forEach(Thread::interrupt); runningJobThreads.clear(); resultJobThreads.clear(); } @@ -205,10 +211,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { * @param value job status: error and done * @param environment */ - private Thread getJobResult(JobEntity job, String value, EnvironmentEntity environment) { - - Thread resultJobThread = Thread.ofVirtual().start(() -> { - log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value); + private void getJobResult(JobEntity job, EnvironmentEntity environment) { + T.ThreadUtil.execAsync(() -> { File destination = null; File pcapDestination = null; InputStream inputStream = null; @@ -246,6 +250,7 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { } job.setPcapId(pcapEntity.getId()); + jobService.updateById(job); } else { // log inputStream = T.ZipUtil.get(zipFile, entry.getName()); @@ -253,9 +258,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { T.FileUtil.writeFromStream(inputStream, logFile); } } - job.setStatus(value); - job.setEndTimestamp(System.currentTimeMillis()); - jobService.updateById(job); } } catch (Exception e) { @@ -268,7 +270,12 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { resultJobThreads.remove(job.getId()); } }); + } - return resultJobThread; + private void updateJobStatus(JobEntity job, String value) { + log.info("[playbookExecResultChecker] [updateJobStatus] [jod id: {}] [status: {}]", job.getId(), value); + job.setStatus(value); + job.setEndTimestamp(System.currentTimeMillis()); + jobService.updateById(job); } }