fix: 修复 获取job 执行结果重复执行问题

This commit is contained in:
zhangshuai
2024-11-07 09:29:07 +08:00
parent 912c0f4ae9
commit bad7573f5d

View File

@@ -57,6 +57,8 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
// 用于追踪虚拟线程,每个 Job 只对应一个虚拟线程
private final Map<String, Thread> runningJobThreads = new ConcurrentHashMap<>();
private final Map<String, Thread> resultJobThreads = new ConcurrentHashMap<>();
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
@@ -112,10 +114,12 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
runningJobThreads.computeIfAbsent(job.getId(), jobId -> startJobVirtualThread(job, environment));
break;
case "error":
this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment);
resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment));
//this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment);
break;
case "done":
this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment);
resultJobThreads.computeIfAbsent(job.getId(), jobId -> getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment));
//this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment);
break;
}
}
@@ -189,7 +193,9 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
@PreDestroy
public void shutdown() {
runningJobThreads.values().forEach(Thread::interrupt);
resultJobThreads.values().forEach(Thread::interrupt);
runningJobThreads.clear();
resultJobThreads.clear();
}
@@ -199,64 +205,70 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
* @param value job status: error and done
* @param environment
*/
private void getJobResult(JobEntity job, String value, EnvironmentEntity environment) {
log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value);
File destination = null;
File pcapDestination = null;
InputStream inputStream = null;
ZipFile zipFile = null;
try {
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/artifact", url, job.getId()));
request.header("Authorization", token);
private Thread getJobResult(JobEntity job, String value, EnvironmentEntity environment) {
HttpResponse response = request.execute();
Thread resultJobThread = Thread.ofVirtual().start(() -> {
log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value);
File destination = null;
File pcapDestination = null;
InputStream inputStream = null;
ZipFile zipFile = null;
try {
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/artifact", url, job.getId()));
request.header("Authorization", token);
log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [request env api] [status: {}]", job.getId(), response.getStatus());
HttpResponse response = request.execute();
if (response.isOk()) {
destination = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
T.FileUtil.writeBytes(response.bodyBytes(), destination);
zipFile = T.ZipUtil.toZipFile(destination, T.CharsetUtil.CHARSET_UTF_8);
List<? extends ZipEntry> list = zipFile.stream().toList();
for (ZipEntry entry : list) {
if (entry.getName().endsWith("pcap")) {
PackageEntity packageEntity = packageService.getById(job.getPackageId());
// pcap name {package.name}-job-{jobId[0:8]}.pcap
String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED, T.StrUtil.sub(job.getId(), 0, 8), ".pcap");
pcapDestination = T.FileUtil.file(Constants.TEMP_PATH, fileName);
inputStream = T.ZipUtil.get(zipFile, entry.getName());
T.FileUtil.writeFromStream(inputStream, pcapDestination);
Resource fileResource = new FileSystemResource(pcapDestination);
// upload pcap file
PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId());
log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [request env api] [status: {}]", job.getId(), response.getStatus());
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [getJobResult] [job id: {}]: {}] [upload pcap: {}]", job.getId(), T.JSONUtil.toJsonStr(pcapEntity));
if (response.isOk()) {
destination = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
T.FileUtil.writeBytes(response.bodyBytes(), destination);
zipFile = T.ZipUtil.toZipFile(destination, T.CharsetUtil.CHARSET_UTF_8);
List<? extends ZipEntry> list = zipFile.stream().toList();
for (ZipEntry entry : list) {
if (entry.getName().endsWith("pcap")) {
PackageEntity packageEntity = packageService.getById(job.getPackageId());
// pcap name {package.name}-job-{jobId[0:8]}.pcap
String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED, T.StrUtil.sub(job.getId(), 0, 8), ".pcap");
pcapDestination = T.FileUtil.file(Constants.TEMP_PATH, fileName);
inputStream = T.ZipUtil.get(zipFile, entry.getName());
T.FileUtil.writeFromStream(inputStream, pcapDestination);
Resource fileResource = new FileSystemResource(pcapDestination);
// upload pcap file
PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId());
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [getJobResult] [job id: {}]: {}] [upload pcap: {}]", job.getId(), T.JSONUtil.toJsonStr(pcapEntity));
}
job.setPcapId(pcapEntity.getId());
} else {
// log
inputStream = T.ZipUtil.get(zipFile, entry.getName());
File logFile = T.FileUtil.file(job.getLogPath());
T.FileUtil.writeFromStream(inputStream, logFile);
}
job.setPcapId(pcapEntity.getId());
}else {
// log
inputStream = T.ZipUtil.get(zipFile, entry.getName());
File logFile = T.FileUtil.file(job.getLogPath());
T.FileUtil.writeFromStream(inputStream, logFile);
}
job.setStatus(value);
job.setEndTimestamp(System.currentTimeMillis());
jobService.updateById(job);
}
job.setStatus(value);
job.setEndTimestamp(System.currentTimeMillis());
jobService.updateById(job);
}
} catch (Exception e) {
log.error("[playbookExecResultChecker] [getJobResult] [error]", e);
}finally {
T.IoUtil.close(zipFile);
T.FileUtil.del(destination);
T.FileUtil.del(pcapDestination);
T.IoUtil.close(inputStream);
}
} catch (Exception e) {
log.error("[playbookExecResultChecker] [getJobResult] [error]", e);
} finally {
T.IoUtil.close(zipFile);
T.FileUtil.del(destination);
T.FileUtil.del(pcapDestination);
T.IoUtil.close(inputStream);
resultJobThreads.remove(job.getId());
}
});
return resultJobThread;
}
}