diff --git a/src/main/java/net/geedge/asw/common/config/job/JobConfig.java b/src/main/java/net/geedge/asw/common/config/job/JobConfig.java index 6c97e7c..1446a4e 100644 --- a/src/main/java/net/geedge/asw/common/config/job/JobConfig.java +++ b/src/main/java/net/geedge/asw/common/config/job/JobConfig.java @@ -70,8 +70,10 @@ public class JobConfig { public void init() throws SchedulerException { // JobEnvironmentStatusChecker createCronScheduleJob(JobEnvironmentStatusChecker(), environment.getProperty("asw.cron.JobEnvironmentStatusChecker", "0 0/1 * * * ? *")); - createCronScheduleJob(JobPlaybookExecutor(), environment.getProperty("asw.cron.JobPlaybookExecutor", "0 0/1 * * * ? *")); - createCronScheduleJob(JobPlaybookExecResultChecker(), environment.getProperty("asw.cron.JobPlaybookExecResultChecker", "0/30 * * * * ?")); + // JobPlaybookExecutor + createCronScheduleJob(JobPlaybookExecutor(), environment.getProperty("asw.cron.JobPlaybookExecutor", "0/30 * * * * ?")); + // JobPlaybookExecResultChecker + createCronScheduleJob(JobPlaybookExecResultChecker(), environment.getProperty("asw.cron.JobPlaybookExecResultChecker", "0/10 * * * * ?")); } /** diff --git a/src/main/java/net/geedge/asw/common/util/Constants.java b/src/main/java/net/geedge/asw/common/util/Constants.java index 76b9d4f..620856a 100644 --- a/src/main/java/net/geedge/asw/common/util/Constants.java +++ b/src/main/java/net/geedge/asw/common/util/Constants.java @@ -111,11 +111,6 @@ public class Constants { public static final String EMPTY_FILE_MD5 = "d41d8cd98f00b204e9800998ecf8427e"; - /** - * tid -> jobId 用于获取 job 运行结果 - */ - public static final Map PLAYBOOK_EXECUTOR_RESULT = T.MapUtil.newHashMap(); - /** * 系统内置角色 */ diff --git a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java index a40ccd9..c244469 100644 --- a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java +++ b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java @@ -75,4 +75,14 @@ public class JobController { return R.ok(); } + + @GetMapping("/{workspaceId}/job/{id}/log") + public R getJobLog(@PathVariable("workspaceId") String workspaceId, + @PathVariable("id") String id, + @RequestParam Integer offset) { + offset = offset == null ? 0 : offset; + Map result = jobService.queryJobLog(id, offset); + return R.ok().putData("record", result); + } + } \ No newline at end of file diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java index d7c063d..016118a 100644 --- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java @@ -5,6 +5,8 @@ import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import cn.hutool.json.JSONObject; import cn.hutool.log.Log; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import jakarta.annotation.PreDestroy; import net.geedge.asw.common.util.Constants; import net.geedge.asw.common.util.RCode; import net.geedge.asw.common.util.T; @@ -24,12 +26,15 @@ import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; -import org.springframework.http.MediaType; import org.springframework.scheduling.quartz.QuartzJobBean; import java.io.File; +import java.io.InputStream; +import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.*; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; @DisallowConcurrentExecution @@ -49,6 +54,10 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { @Autowired private IPackageService packageService; + // 用于追踪虚拟线程,每个 Job 只对应一个虚拟线程 + private final Map runningJobThreads = new ConcurrentHashMap<>(); + + @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { @@ -68,75 +77,186 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean { } private void playbookExecResultChecker() { - Set> entryList = Constants.PLAYBOOK_EXECUTOR_RESULT.entrySet(); - if (entryList.isEmpty()) { + + List jobList = jobService.list(new LambdaQueryWrapper().eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())); + if (jobList.isEmpty()) { return; } - for (Map.Entry entry : entryList) { + for (JobEntity job : jobList) { Thread.ofVirtual().start(() -> { - String tid = entry.getKey(); - String jobId = entry.getValue(); - JobEntity job = jobService.getById(jobId); + String id = job.getId(); EnvironmentEntity environment = environmentService.getById(job.getEnvId()); - log.info("[playbookExecResultChecker] [tid: {}] [jobId: {}] [envId]", tid, jobId, environment.getId()); + log.info("[playbookExecResultChecker] [jobId: {}] [envId]", id, environment.getId()); JSONObject paramJSONObject = environment.getParamJSONObject(); String url = paramJSONObject.getStr("url"); String token = paramJSONObject.getStr("token"); - HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, tid)); + HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, id)); request.header("Authorization", token); HttpResponse response = request.execute(); - log.info("[playbookExecResultChecker] [env: {}] [status: {}]", environment.getId(), response.getStatus()); + log.info("[playbookExecResultChecker] [request env api] [env: {}] [status: {}]", environment.getId(), response.getStatus()); - File destination = null; if (response.isOk()) { - // file - if (MediaType.APPLICATION_OCTET_STREAM_VALUE.equals(response.header(Header.CONTENT_TYPE.getValue()))) { + String result = response.body(); + if (log.isDebugEnabled()) { + log.debug("[playbookExecResultChecker] [env: {}] [result: {}]", environment.getId(), result); + } + JSONObject jsonObject = T.JSONUtil.parseObj(result); + if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) { + JSONObject data = jsonObject.getJSONObject("data"); + String status = data.getStr("status"); - PackageEntity packageEntity = packageService.getById(job.getPackageId()); - // pcap name {package.name}-job-{jobId[0:8]}.pcap - String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED , T.StrUtil.sub(jobId, 0, 8), ".pcap"); - if (log.isDebugEnabled()) { - log.debug("[playbookExecResultChecker] [env: {}] [result fileName: {}]", environment.getId(), fileName); - } - - destination = T.FileUtil.file(Constants.TEMP_PATH, fileName); - T.FileUtil.writeBytes(response.bodyBytes(), destination); - Resource fileResource = new FileSystemResource(destination); - // upload pcap file - PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId()); - job.setPcapId(pcapEntity.getId()); - job.setStatus(RunnerConstant.JobStatus.PASSED.getValue()); - job.setEndTimestamp(System.currentTimeMillis()); - job.setUpdateTimestamp(System.currentTimeMillis()); - jobService.updateById(job); - Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid); - } else { - - String result = response.body(); - if (log.isDebugEnabled()) { - log.debug("[playbookExecResultChecker] [env: {}] [result: {}]", environment.getId(), result); - } - - JSONObject jsonObject = T.JSONUtil.parseObj(result); - if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) { - JSONObject data = jsonObject.getJSONObject("data"); - String status = data.getStr("status"); - if (!RunnerConstant.JobStatus.RUNNING.getValue().equals(status)) { - job.setStatus(RunnerConstant.JobStatus.FAILED.getValue()); - job.setUpdateTimestamp(System.currentTimeMillis()); - job.setEndTimestamp(System.currentTimeMillis()); - jobService.updateById(job); - Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid); - } + switch (status){ + case "running": + runningJobThreads.computeIfAbsent(job.getId(), jobId -> startJobVirtualThread(job, environment)); + break; + case "error": + this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment); + break; + case "done": + this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment); + break; } } } - if (destination != null) { - T.FileUtil.del(destination); - } }); } } + + private Thread startJobVirtualThread(JobEntity job, EnvironmentEntity environment) { + Thread virtualThread = Thread.ofVirtual().start(() -> { + try { + while (true) { + if (isJobInRunningStatus(job)) { + if (log.isDebugEnabled()) { + log.debug("[playbookExecResultChecker] [Job status updated] [stopJobVirtualThread ] [job id: {}]", job.getId()); + } + runningJobThreads.remove(job.getId()); + break; + } + performJobLogic(job, environment); + Thread.sleep(2000); // 每 2 秒执行一次 + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 恢复中断状态 + } + }); + return virtualThread; + } + + // 检查 Job 的状态是否为 running + private boolean isJobInRunningStatus(JobEntity jobEntity) { + JobEntity job = jobService.getById(jobEntity.getId()); + return job != null && !T.StrUtil.equalsIgnoreCase(job.getStatus(), RunnerConstant.JobStatus.RUNNING.getValue()); + } + + /** + * 获取 playbook 执行日志 + * @param job + * @param environment + */ + private void performJobLogic(JobEntity job, EnvironmentEntity environment) { + File logFile = T.FileUtil.file(job.getLogPath()); + Integer offset = 0; + if (logFile.exists()){ + offset = T.FileUtil.readBytes(logFile).length; + } + JSONObject paramJSONObject = environment.getParamJSONObject(); + String url = paramJSONObject.getStr("url"); + String token = paramJSONObject.getStr("token"); + HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/log", url, job.getId())); + request.form("offset", offset); + request.header("Authorization", token); + + HttpResponse response = request.execute(); + + if (response.isOk()) { + String result = response.body(); + if (log.isDebugEnabled()) { + log.debug("[playbookExecResultChecker] [performJobLogic] [env: {}] [result: {}]", environment.getId(), result); + } + JSONObject jsonObject = T.JSONUtil.parseObj(result); + if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) { + JSONObject data = jsonObject.getJSONObject("data"); + String content = data.getStr("content"); + T.FileUtil.appendString(content, logFile, "UTF-8"); + } + } + } + + + @PreDestroy + public void shutdown() { + runningJobThreads.values().forEach(Thread::interrupt); + runningJobThreads.clear(); + } + + + /** + * get pcap log + * @param job + * @param value job status: error and done + * @param environment + */ + private void getJobResult(JobEntity job, String value, EnvironmentEntity environment) { + log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value); + File destination = null; + File pcapDestination = null; + InputStream inputStream = null; + ZipFile zipFile = null; + try { + JSONObject paramJSONObject = environment.getParamJSONObject(); + String url = paramJSONObject.getStr("url"); + String token = paramJSONObject.getStr("token"); + HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/artifact", url, job.getId())); + request.header("Authorization", token); + + HttpResponse response = request.execute(); + + log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [request env api] [status: {}]", job.getId(), response.getStatus()); + + if (response.isOk()) { + destination = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip")); + T.FileUtil.writeBytes(response.bodyBytes(), destination); + zipFile = T.ZipUtil.toZipFile(destination, T.CharsetUtil.CHARSET_UTF_8); + List list = zipFile.stream().toList(); + for (ZipEntry entry : list) { + if (entry.getName().endsWith("pcap")) { + PackageEntity packageEntity = packageService.getById(job.getPackageId()); + // pcap name {package.name}-job-{jobId[0:8]}.pcap + String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED, T.StrUtil.sub(job.getId(), 0, 8), ".pcap"); + pcapDestination = T.FileUtil.file(Constants.TEMP_PATH, fileName); + inputStream = T.ZipUtil.get(zipFile, entry.getName()); + T.FileUtil.writeFromStream(inputStream, pcapDestination); + Resource fileResource = new FileSystemResource(pcapDestination); + // upload pcap file + PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId()); + + if (log.isDebugEnabled()) { + log.debug("[playbookExecResultChecker] [getJobResult] [job id: {}]: {}] [upload pcap: {}]", job.getId(), T.JSONUtil.toJsonStr(pcapEntity)); + } + + job.setPcapId(pcapEntity.getId()); + }else { + // log + inputStream = T.ZipUtil.get(zipFile, entry.getName()); + File logFile = T.FileUtil.file(job.getLogPath()); + T.FileUtil.writeFromStream(inputStream, logFile); + } + } + job.setStatus(value); + job.setUpdateTimestamp(System.currentTimeMillis()); + jobService.updateById(job); + } + + } catch (Exception e) { + log.error("[playbookExecResultChecker] [getJobResult] [error]", e); + }finally { + T.IoUtil.close(zipFile); + T.FileUtil.del(destination); + T.FileUtil.del(pcapDestination); + T.IoUtil.close(inputStream); + } + } } diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java index 58ee133..5860318 100644 --- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java @@ -86,7 +86,6 @@ public class JobPlaybookExecutor extends QuartzJobBean { continue; } - String result = null; String playbookId = job.getPlaybookId(); String packageId = job.getPackageId(); PackageEntity packageEntity = packageService.getById(packageId); @@ -101,41 +100,42 @@ public class JobPlaybookExecutor extends QuartzJobBean { String url = paramJSONObject.getStr("url"); String token = paramJSONObject.getStr("token"); + File zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip")); + HttpRequest request = T.HttpUtil.createPost(String.format("%s/api/v1/env/playbook", url)); - request.form("files", packageFile, playbookFile); + T.ZipUtil.zip(zipFile, true, packageFile, playbookFile); + request.form("file", zipFile); + request.form("id", job.getId()); request.form("packageName", packageName); request.header("Authorization", token); HttpResponse response = request.execute(); - log.info("[playbookExecutor] [env] [status: {}]", environment.getId(), response.getStatus()); + log.info("[playbookExecutor] [env: {}] [status: {}]", environment.getId(), response.getStatus()); if (response.isOk()) { - result = response.body(); - } + // update job status, starTime, updateTimestamp + jobService.update(new LambdaUpdateWrapper() + .set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()) + .set(JobEntity::getStartTimestamp, System.currentTimeMillis()) + .eq(JobEntity::getId, job.getId()) + ); - if (log.isDebugEnabled()) { - log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result); - } - - if (T.StrUtil.isNotEmpty(result)) { - try { - JSONObject jsonObject = T.JSONUtil.parseObj(result); - if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) { - JSONObject data = jsonObject.getJSONObject("data"); - String tid = data.getStr("tid"); - Constants.PLAYBOOK_EXECUTOR_RESULT.put(tid, job.getId()); - } - } catch (Exception e) { - log.error(e, "[playbookExecutor] [parse result error] [result: {}]", job.getId(), result); + }else { + String result = response.body(); + if (log.isDebugEnabled()) { + log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result); } - } - // update job status, starTime, updateTimestamp - jobService.update(new LambdaUpdateWrapper() - .set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()) - .set(JobEntity::getUpdateTimestamp, System.currentTimeMillis()) - .set(JobEntity::getStartTimestamp, System.currentTimeMillis()) - .eq(JobEntity::getId, job.getId()) - ); + File logFile = T.FileUtil.file(job.getLogPath()); + T.FileUtil.appendString(String.format("ERROR: Request %s environment error \n", environment.getId()), logFile, "UTF-8"); + T.FileUtil.appendString(String.format("Result: %s", result), logFile, "UTF-8"); + + // update job status, starTime, updateTimestamp + jobService.update(new LambdaUpdateWrapper() + .set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue()) + .set(JobEntity::getStartTimestamp, System.currentTimeMillis()) + .set(JobEntity::getEndTimestamp, System.currentTimeMillis()) + .eq(JobEntity::getId, job.getId())); + } } }); } diff --git a/src/main/java/net/geedge/asw/module/runner/service/IJobService.java b/src/main/java/net/geedge/asw/module/runner/service/IJobService.java index ec148d9..a8a6db8 100644 --- a/src/main/java/net/geedge/asw/module/runner/service/IJobService.java +++ b/src/main/java/net/geedge/asw/module/runner/service/IJobService.java @@ -17,6 +17,8 @@ public interface IJobService extends IService{ void removeJob(List ids); + Map queryJobLog(String id, Integer offset); + // JobEntity assignPendingJob(String id, String platform); // // void appendTraceLogStrToFile(String jobId, String content) throws RuntimeException; diff --git a/src/main/java/net/geedge/asw/module/runner/service/impl/JobServiceImpl.java b/src/main/java/net/geedge/asw/module/runner/service/impl/JobServiceImpl.java index d7a5799..1e060f8 100644 --- a/src/main/java/net/geedge/asw/module/runner/service/impl/JobServiceImpl.java +++ b/src/main/java/net/geedge/asw/module/runner/service/impl/JobServiceImpl.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import net.geedge.asw.common.config.Query; +import net.geedge.asw.common.util.ASWException; import net.geedge.asw.common.util.RCode; import net.geedge.asw.common.util.T; import net.geedge.asw.module.app.entity.PackageEntity; @@ -19,10 +20,13 @@ import net.geedge.asw.module.runner.service.IJobService; import net.geedge.asw.module.runner.service.IPlaybookService; import net.geedge.asw.module.runner.util.RunnerConstant; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.web.servlet.MultipartAutoConfiguration; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.util.List; import java.util.Map; @@ -117,6 +121,35 @@ public class JobServiceImpl extends ServiceImpl implements IJ this.removeBatchByIds(ids); } + @Override + public Map queryJobLog(String id, Integer offset) { + JobEntity job = this.getById(id); + + Map result = T.MapUtil.newHashMap(); + File logFile = T.FileUtil.file(job.getLogPath()); + + if (logFile.exists()){ + try (RandomAccessFile raf = new RandomAccessFile(logFile, "r")) { + raf.seek(offset); + byte[] bytes = new byte[(int)raf.length() - offset]; + raf.readFully(bytes); + String content = new String(bytes); + result.put("content", content); + result.put("length", bytes.length); + result.put("offset", offset + bytes.length); + } catch (IOException e) { + log.error("queryJobLog error", e); + throw new ASWException(RCode.ERROR); + } + }else { + result.put("content", T.StrUtil.EMPTY); + result.put("length", 0); + result.put("offset", 0); + } + + return result; + } + // @Override // public synchronized JobEntity assignPendingJob(String runnerId, String platform) { // if (T.StrUtil.hasEmpty(runnerId, platform)) {