From e68a16a500c71f7f06a10bf9671759675edb542e Mon Sep 17 00:00:00 2001 From: zhangshuai Date: Mon, 21 Oct 2024 16:45:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20ASW-100=20job=20=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E5=8A=9F=E8=83=BD=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../asw/common/config/job/JobConfig.java | 20 +++ .../net/geedge/asw/common/util/Constants.java | 5 + .../job/JobPlaybookExecResultChecker.java | 133 ++++++++++++++++ .../runner/job/JobPlaybookExecutor.java | 143 ++++++++++++++++++ .../runner/service/impl/PcapServiceImpl.java | 6 +- 5 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java create mode 100644 src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java diff --git a/src/main/java/net/geedge/asw/common/config/job/JobConfig.java b/src/main/java/net/geedge/asw/common/config/job/JobConfig.java index daf9101..6c97e7c 100644 --- a/src/main/java/net/geedge/asw/common/config/job/JobConfig.java +++ b/src/main/java/net/geedge/asw/common/config/job/JobConfig.java @@ -4,6 +4,8 @@ import cn.hutool.log.Log; import jakarta.annotation.PostConstruct; import net.geedge.asw.common.util.T; import net.geedge.asw.module.environment.job.JobEnvironmentStatusChecker; +import net.geedge.asw.module.runner.job.JobPlaybookExecResultChecker; +import net.geedge.asw.module.runner.job.JobPlaybookExecutor; import net.geedge.asw.module.sys.service.ISysConfigService; import org.quartz.*; import org.springframework.beans.factory.annotation.Autowired; @@ -48,10 +50,28 @@ public class JobConfig { .build(); } + @Bean + public JobDetail JobPlaybookExecutor() { + return JobBuilder.newJob(JobPlaybookExecutor.class) + .withIdentity(getJobKey(JobPlaybookExecutor.class.getSimpleName())) + .storeDurably() + .build(); + } + + @Bean + public JobDetail JobPlaybookExecResultChecker() { + return JobBuilder.newJob(JobPlaybookExecResultChecker.class) + .withIdentity(getJobKey(JobPlaybookExecResultChecker.class.getSimpleName())) + .storeDurably() + .build(); + } + @PostConstruct public void init() throws SchedulerException { // JobEnvironmentStatusChecker createCronScheduleJob(JobEnvironmentStatusChecker(), environment.getProperty("asw.cron.JobEnvironmentStatusChecker", "0 0/1 * * * ? *")); + createCronScheduleJob(JobPlaybookExecutor(), environment.getProperty("asw.cron.JobPlaybookExecutor", "0 0/1 * * * ? *")); + createCronScheduleJob(JobPlaybookExecResultChecker(), environment.getProperty("asw.cron.JobPlaybookExecResultChecker", "0/30 * * * * ?")); } /** diff --git a/src/main/java/net/geedge/asw/common/util/Constants.java b/src/main/java/net/geedge/asw/common/util/Constants.java index 2fe4f98..d12e233 100644 --- a/src/main/java/net/geedge/asw/common/util/Constants.java +++ b/src/main/java/net/geedge/asw/common/util/Constants.java @@ -110,4 +110,9 @@ public class Constants { public static final List ANDROID_PACKAGE_TYPE_LIST = T.ListUtil.of("xapk", "apk"); public static final String EMPTY_FILE_MD5 = "d41d8cd98f00b204e9800998ecf8427e"; + + /** + * tid -> jobId 用于获取 job 运行结果 + */ + public static final Map PLAYBOOK_EXECUTOR_RESULT = T.MapUtil.newHashMap(); } diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java new file mode 100644 index 0000000..d652866 --- /dev/null +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java @@ -0,0 +1,133 @@ +package net.geedge.asw.module.runner.job; + +import cn.hutool.http.Header; +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.hutool.json.JSONObject; +import cn.hutool.log.Log; +import net.geedge.asw.common.util.Constants; +import net.geedge.asw.common.util.RCode; +import net.geedge.asw.common.util.T; +import net.geedge.asw.module.environment.entity.EnvironmentEntity; +import net.geedge.asw.module.environment.service.IEnvironmentService; +import net.geedge.asw.module.runner.entity.JobEntity; +import net.geedge.asw.module.runner.entity.PcapEntity; +import net.geedge.asw.module.runner.service.IJobService; +import net.geedge.asw.module.runner.service.IPcapService; +import net.geedge.asw.module.runner.util.RunnerConstant; +import org.apache.commons.lang3.time.StopWatch; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; +import org.springframework.http.MediaType; +import org.springframework.scheduling.quartz.QuartzJobBean; + +import java.io.File; +import java.util.Map; +import java.util.Set; + + +@DisallowConcurrentExecution +public class JobPlaybookExecResultChecker extends QuartzJobBean { + private static final Log log = Log.get(); + + @Autowired + private IEnvironmentService environmentService; + + @Autowired + private IPcapService pcapService; + + @Autowired + private IJobService jobService; + + @Override + protected void executeInternal(JobExecutionContext context) throws JobExecutionException { + + Thread.currentThread().setName("JobPlaybookExecResultChecker"); + + log.info("[JobPlaybookExecResultChecker] [begin]"); + StopWatch sw = new StopWatch(); + sw.start(); + try { + this.playbookExecResultChecker(); + } catch (Exception e) { + log.error(e, "[JobPlaybookExecResultChecker] [error]"); + } finally { + sw.stop(); + } + log.info("[JobPlaybookExecResultChecker] [finshed] [Run Time: {}]", sw.toString()); + } + + private void playbookExecResultChecker() { + Set> entryList = Constants.PLAYBOOK_EXECUTOR_RESULT.entrySet(); + if (entryList.isEmpty()) { + return; + } + for (Map.Entry entry : entryList) { + Thread.ofVirtual().start(() -> { + String tid = entry.getKey(); + String jobId = entry.getValue(); + JobEntity job = jobService.getById(jobId); + EnvironmentEntity environment = environmentService.getById(job.getEnvId()); + log.info("[playbookExecResultChecker] [tid: {}] [jobId: {}] [envId]", tid, jobId, environment.getId()); + + JSONObject paramJSONObject = environment.getParamJSONObject(); + String url = paramJSONObject.getStr("url"); + String token = paramJSONObject.getStr("token"); + HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, tid)); + request.header("Authorization", token); + + HttpResponse response = request.execute(); + log.info("[playbookExecResultChecker] [env: {}] [status: {}]", environment.getId(), response.getStatus()); + + File destination = null; + if (response.isOk()) { + // file + if (MediaType.APPLICATION_OCTET_STREAM_VALUE.equals(response.header(Header.CONTENT_TYPE.getValue()))) { + + String fileName = response.header(Header.CONTENT_DISPOSITION).split("filename=")[1]; + if (log.isDebugEnabled()) { + log.debug("[playbookExecResultChecker] [env: {}] [result fileName: {}]", environment.getId(), fileName); + } + + destination = T.FileUtil.file(Constants.TEMP_PATH, fileName); + T.FileUtil.writeBytes(response.bodyBytes(), destination); + Resource fileResource = new FileSystemResource(destination); + // upload pcap file + PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId()); + job.setPcapId(pcapEntity.getId()); + job.setStatus(RunnerConstant.JobStatus.PASSED.getValue()); + job.setEndTimestamp(System.currentTimeMillis()); + job.setUpdateTimestamp(System.currentTimeMillis()); + jobService.updateById(job); + Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid); + } else { + + String result = response.body(); + if (log.isDebugEnabled()) { + log.debug("[playbookExecResultChecker] [env: {}] [result: {}]", environment.getId(), result); + } + + JSONObject jsonObject = T.JSONUtil.parseObj(result); + if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) { + JSONObject data = jsonObject.getJSONObject("data"); + String status = data.getStr("status"); + if (!RunnerConstant.JobStatus.RUNNING.getValue().equals(status)) { + job.setStatus(RunnerConstant.JobStatus.FAILED.getValue()); + job.setUpdateTimestamp(System.currentTimeMillis()); + jobService.updateById(job); + Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid); + } + } + } + } + if (destination != null) { + T.FileUtil.del(destination); + } + }); + } + } +} diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java new file mode 100644 index 0000000..dd96d35 --- /dev/null +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java @@ -0,0 +1,143 @@ +package net.geedge.asw.module.runner.job; + +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.hutool.json.JSONObject; +import cn.hutool.log.Log; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import net.geedge.asw.common.util.Constants; +import net.geedge.asw.common.util.RCode; +import net.geedge.asw.common.util.T; +import net.geedge.asw.module.app.entity.PackageEntity; +import net.geedge.asw.module.app.service.IPackageService; +import net.geedge.asw.module.environment.entity.EnvironmentEntity; +import net.geedge.asw.module.environment.service.IEnvironmentService; +import net.geedge.asw.module.runner.entity.JobEntity; +import net.geedge.asw.module.runner.entity.PlaybookEntity; +import net.geedge.asw.module.runner.service.IJobService; +import net.geedge.asw.module.runner.service.IPlaybookService; +import net.geedge.asw.module.runner.util.RunnerConstant; +import org.apache.commons.lang3.time.StopWatch; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.quartz.QuartzJobBean; +import org.springframework.transaction.annotation.Transactional; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +@DisallowConcurrentExecution +public class JobPlaybookExecutor extends QuartzJobBean { + + private static final Log log = Log.get(); + + @Autowired + private IJobService jobService; + + @Autowired + private IEnvironmentService environmentService; + + @Autowired + private IPackageService packageService; + + @Autowired + private IPlaybookService playbookService; + + @Override + protected void executeInternal(JobExecutionContext context) { + Thread.currentThread().setName("JobPlaybookExecutor"); + + log.info("[JobPlaybookExecutor] [begin]"); + StopWatch sw = new StopWatch(); + sw.start(); + try { + this.playbookExecutor(); + } catch (Exception e) { + log.error(e, "[JobPlaybookExecutor] [error]"); + } finally { + sw.stop(); + } + log.info("[JobPlaybookExecutor] [finshed] [Run Time: {}]", sw.toString()); + } + + @Transactional(rollbackFor = Exception.class) + public void playbookExecutor() { + List list = jobService.list(new LambdaQueryWrapper().eq(JobEntity::getStatus, "create")); + Map> jobByEnvList = list.stream().collect(Collectors.groupingBy(JobEntity::getEnvId)); + for (Map.Entry> jobByEnv : jobByEnvList.entrySet()) { + String envId = jobByEnv.getKey(); + List jobList = jobByEnv.getValue(); + Thread.ofVirtual().start(() -> { + for (JobEntity job : jobList) { + List JobRunList = jobService.list(new LambdaQueryWrapper().eq(JobEntity::getStatus, "running").eq(JobEntity::getEnvId, envId)); + if (T.CollUtil.isNotEmpty(JobRunList)) { + continue; + } + EnvironmentEntity environment = environmentService.getById(envId); + if (!environment.getStatus().equals(1)) { + if (log.isDebugEnabled()) { + log.debug("[playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", job.getId(), environment.getId()); + } + continue; + } + + String result = null; + String playbookId = job.getPlaybookId(); + String packageId = job.getPackageId(); + PackageEntity packageEntity = packageService.getById(packageId); + File packageFile = T.FileUtil.file(packageEntity.getPath()); + String packageName = packageEntity.getIdentifier(); + PlaybookEntity playbook = playbookService.getById(playbookId); + File playbookFile = T.FileUtil.file(playbook.getPath()); + + log.info("[playbookExecutor] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId); + + JSONObject paramJSONObject = environment.getParamJSONObject(); + String url = paramJSONObject.getStr("url"); + String token = paramJSONObject.getStr("token"); + + HttpRequest request = T.HttpUtil.createPost(String.format("%s/api/v1/env/playbook", url)); + request.form("files", packageFile, playbookFile); + request.form("packageName", packageName); + request.header("Authorization", token); + + HttpResponse response = request.execute(); + log.info("[playbookExecutor] [env] [status: {}]", environment.getId(), response.getStatus()); + if (response.isOk()) { + result = response.body(); + } + + if (log.isDebugEnabled()) { + log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result); + } + + if (T.StrUtil.isNotEmpty(result)) { + try { + JSONObject jsonObject = T.JSONUtil.parseObj(result); + if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) { + JSONObject data = jsonObject.getJSONObject("data"); + String tid = data.getStr("tid"); + Constants.PLAYBOOK_EXECUTOR_RESULT.put(tid, job.getId()); + } + } catch (Exception e) { + log.error(e, "[playbookExecutor] [parse result error] [result: {}]", job.getId(), result); + } + } + + // update job status, starTime, updateTimestamp + jobService.update(new LambdaUpdateWrapper() + .set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()) + .set(JobEntity::getUpdateTimestamp, System.currentTimeMillis()) + .set(JobEntity::getStartTimestamp, System.currentTimeMillis()) + .eq(JobEntity::getId, job.getId()) + ); + } + }); + } + } +} diff --git a/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java b/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java index 5484252..00a29e3 100644 --- a/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java +++ b/src/main/java/net/geedge/asw/module/runner/service/impl/PcapServiceImpl.java @@ -125,8 +125,10 @@ public class PcapServiceImpl extends ServiceImpl implements public PcapEntity savePcap(Resource fileResource, String... params) { String description = T.ArrayUtil.get(params, 0); String workspaceId = T.ArrayUtil.get(params, 1); - String createUserId = T.StrUtil.emptyToDefault(T.ArrayUtil.get(params, 3), StpUtil.getLoginIdAsString()); - + String createUserId = T.ArrayUtil.get(params, 2); + if (T.StrUtil.isEmpty(createUserId)){ + createUserId = StpUtil.getLoginIdAsString(); + } PcapEntity entity = new PcapEntity(); try { String pcapId = T.StrUtil.uuid();