diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java index d982efd..00c0b73 100644 --- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java @@ -57,6 +57,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { // 用于追踪虚拟线程,每个 Job 只对应一个虚拟线程 private final Map runningJobThreads = new ConcurrentHashMap<>(); + private final Map resultJobThreads = new ConcurrentHashMap<>(); + @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { @@ -112,10 +114,12 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { runningJobThreads.computeIfAbsent(job.getId(), jobId -> startJobVirtualThread(job, environment)); break; case "error": - this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment); + resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment)); + //this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment); break; case "done": - this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment); + resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment)); + //this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment); break; } } @@ -189,7 +193,9 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { @PreDestroy public void shutdown() { runningJobThreads.values().forEach(Thread::interrupt); + resultJobThreads.values().forEach(Thread::interrupt); runningJobThreads.clear(); + resultJobThreads.clear(); } @@ -199,64 +205,70 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { * @param value job status: error and done * @param environment */ - private void getJobResult(JobEntity job, String value, EnvironmentEntity environment) { - log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value); - File destination = null; - File pcapDestination = null; - InputStream inputStream = null; - ZipFile zipFile = null; - try { - JSONObject paramJSONObject = environment.getParamJSONObject(); - String url = paramJSONObject.getStr("url"); - String token = paramJSONObject.getStr("token"); - HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/artifact", url, job.getId())); - request.header("Authorization", token); + private Thread getJobResult(JobEntity job, String value, EnvironmentEntity environment) { - HttpResponse response = request.execute(); + Thread resultJobThread = Thread.ofVirtual().start(() -> { + log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value); + File destination = null; + File pcapDestination = null; + InputStream inputStream = null; + ZipFile zipFile = null; + try { + JSONObject paramJSONObject = environment.getParamJSONObject(); + String url = paramJSONObject.getStr("url"); + String token = paramJSONObject.getStr("token"); + HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/artifact", url, job.getId())); + request.header("Authorization", token); - log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [request env api] [status: {}]", job.getId(), response.getStatus()); + HttpResponse response = request.execute(); - if (response.isOk()) { - destination = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip")); - T.FileUtil.writeBytes(response.bodyBytes(), destination); - zipFile = T.ZipUtil.toZipFile(destination, T.CharsetUtil.CHARSET_UTF_8); - List list = zipFile.stream().toList(); - for (ZipEntry entry : list) { - if (entry.getName().endsWith("pcap")) { - PackageEntity packageEntity = packageService.getById(job.getPackageId()); - // pcap name {package.name}-job-{jobId[0:8]}.pcap - String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED, T.StrUtil.sub(job.getId(), 0, 8), ".pcap"); - pcapDestination = T.FileUtil.file(Constants.TEMP_PATH, fileName); - inputStream = T.ZipUtil.get(zipFile, entry.getName()); - T.FileUtil.writeFromStream(inputStream, pcapDestination); - Resource fileResource = new FileSystemResource(pcapDestination); - // upload pcap file - PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId()); + log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [request env api] [status: {}]", job.getId(), response.getStatus()); - if (log.isDebugEnabled()) { - log.debug("[playbookExecResultChecker] [getJobResult] [job id: {}]: {}] [upload pcap: {}]", job.getId(), T.JSONUtil.toJsonStr(pcapEntity)); + if (response.isOk()) { + destination = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip")); + T.FileUtil.writeBytes(response.bodyBytes(), destination); + zipFile = T.ZipUtil.toZipFile(destination, T.CharsetUtil.CHARSET_UTF_8); + List list = zipFile.stream().toList(); + for (ZipEntry entry : list) { + if (entry.getName().endsWith("pcap")) { + PackageEntity packageEntity = packageService.getById(job.getPackageId()); + // pcap name {package.name}-job-{jobId[0:8]}.pcap + String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED, T.StrUtil.sub(job.getId(), 0, 8), ".pcap"); + pcapDestination = T.FileUtil.file(Constants.TEMP_PATH, fileName); + inputStream = T.ZipUtil.get(zipFile, entry.getName()); + T.FileUtil.writeFromStream(inputStream, pcapDestination); + Resource fileResource = new FileSystemResource(pcapDestination); + // upload pcap file + PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId()); + + if (log.isDebugEnabled()) { + log.debug("[playbookExecResultChecker] [getJobResult] [job id: {}]: {}] [upload pcap: {}]", job.getId(), T.JSONUtil.toJsonStr(pcapEntity)); + } + + job.setPcapId(pcapEntity.getId()); + } else { + // log + inputStream = T.ZipUtil.get(zipFile, entry.getName()); + File logFile = T.FileUtil.file(job.getLogPath()); + T.FileUtil.writeFromStream(inputStream, logFile); } - - job.setPcapId(pcapEntity.getId()); - }else { - // log - inputStream = T.ZipUtil.get(zipFile, entry.getName()); - File logFile = T.FileUtil.file(job.getLogPath()); - T.FileUtil.writeFromStream(inputStream, logFile); } + job.setStatus(value); + job.setEndTimestamp(System.currentTimeMillis()); + jobService.updateById(job); } - job.setStatus(value); - job.setEndTimestamp(System.currentTimeMillis()); - jobService.updateById(job); - } - } catch (Exception e) { - log.error("[playbookExecResultChecker] [getJobResult] [error]", e); - }finally { - T.IoUtil.close(zipFile); - T.FileUtil.del(destination); - T.FileUtil.del(pcapDestination); - T.IoUtil.close(inputStream); - } + } catch (Exception e) { + log.error("[playbookExecResultChecker] [getJobResult] [error]", e); + } finally { + T.IoUtil.close(zipFile); + T.FileUtil.del(destination); + T.FileUtil.del(pcapDestination); + T.IoUtil.close(inputStream); + resultJobThreads.remove(job.getId()); + } + }); + + return resultJobThread; } }