From 19090ae43df041e95c63b27bccac838e75c982c3 Mon Sep 17 00:00:00 2001 From: zhangshuai Date: Thu, 7 Nov 2024 14:56:42 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20=E8=8E=B7=E5=8F=96j?= =?UTF-8?q?ob=20=E6=89=A7=E8=A1=8C=E7=BB=93=E6=9E=9C=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/JobPlaybookExecResultChecker.java | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java index 00c0b73..5c848dd 100644 --- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java @@ -55,9 +55,9 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { private IPackageService packageService; // 用于追踪虚拟线程,每个 Job 只对应一个虚拟线程 - private final Map runningJobThreads = new ConcurrentHashMap<>(); + private List runningJobThreads = T.ListUtil.list(false); - private final Map resultJobThreads = new ConcurrentHashMap<>(); + private List resultJobThreads = T.ListUtil.list(false); @Override @@ -111,15 +111,24 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { switch (status){ case "running": - runningJobThreads.computeIfAbsent(job.getId(), jobId -> startJobVirtualThread(job, environment)); + if (!runningJobThreads.contains(id)){ + runningJobThreads.add(id); + startJobVirtualThread(job, environment); + } break; case "error": - resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment)); - //this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment); + updateJobStatus(job, RunnerConstant.JobStatus.FAILED.getValue()); + if (!resultJobThreads.contains(id)){ + resultJobThreads.add(id); + getJobResult(job, environment); + } break; case "done": - resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment)); - //this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment); + updateJobStatus(job, RunnerConstant.JobStatus.PASSED.getValue()); + if (!resultJobThreads.contains(id)){ + resultJobThreads.add(id); + getJobResult(job, environment); + } break; } } @@ -128,8 +137,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { } } - private Thread startJobVirtualThread(JobEntity job, EnvironmentEntity environment) { - Thread virtualThread = Thread.ofVirtual().start(() -> { + private void startJobVirtualThread(JobEntity job, EnvironmentEntity environment) { + T.ThreadUtil.execAsync(() -> { try { while (true) { if (isJobInRunningStatus(job)) { @@ -146,7 +155,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { Thread.currentThread().interrupt(); // 恢复中断状态 } }); - return virtualThread; } // 检查 Job 的状态是否为 running @@ -192,8 +200,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { @PreDestroy public void shutdown() { - runningJobThreads.values().forEach(Thread::interrupt); - resultJobThreads.values().forEach(Thread::interrupt); runningJobThreads.clear(); resultJobThreads.clear(); } @@ -205,10 +211,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { * @param value job status: error and done * @param environment */ - private Thread getJobResult(JobEntity job, String value, EnvironmentEntity environment) { - - Thread resultJobThread = Thread.ofVirtual().start(() -> { - log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value); + private void getJobResult(JobEntity job, EnvironmentEntity environment) { + T.ThreadUtil.execAsync(() -> { File destination = null; File pcapDestination = null; InputStream inputStream = null; @@ -246,6 +250,7 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { } job.setPcapId(pcapEntity.getId()); + jobService.updateById(job); } else { // log inputStream = T.ZipUtil.get(zipFile, entry.getName()); @@ -253,9 +258,6 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { T.FileUtil.writeFromStream(inputStream, logFile); } } - job.setStatus(value); - job.setEndTimestamp(System.currentTimeMillis()); - jobService.updateById(job); } } catch (Exception e) { @@ -268,7 +270,12 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { resultJobThreads.remove(job.getId()); } }); + } - return resultJobThread; + private void updateJobStatus(JobEntity job, String value) { + log.info("[playbookExecResultChecker] [updateJobStatus] [jod id: {}] [status: {}]", job.getId(), value); + job.setStatus(value); + job.setEndTimestamp(System.currentTimeMillis()); + jobService.updateById(job); } }