feat: ASW-100 job 定时调度功能开发

This commit is contained in:
zhangshuai
2024-10-21 16:45:45 +08:00
parent fe0a344ec4
commit e68a16a500
5 changed files with 305 additions and 2 deletions

View File

@@ -4,6 +4,8 @@ import cn.hutool.log.Log;
import jakarta.annotation.PostConstruct;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.environment.job.JobEnvironmentStatusChecker;
import net.geedge.asw.module.runner.job.JobPlaybookExecResultChecker;
import net.geedge.asw.module.runner.job.JobPlaybookExecutor;
import net.geedge.asw.module.sys.service.ISysConfigService;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
@@ -48,10 +50,28 @@ public class JobConfig {
.build();
}
@Bean
public JobDetail JobPlaybookExecutor() {
return JobBuilder.newJob(JobPlaybookExecutor.class)
.withIdentity(getJobKey(JobPlaybookExecutor.class.getSimpleName()))
.storeDurably()
.build();
}
@Bean
public JobDetail JobPlaybookExecResultChecker() {
return JobBuilder.newJob(JobPlaybookExecResultChecker.class)
.withIdentity(getJobKey(JobPlaybookExecResultChecker.class.getSimpleName()))
.storeDurably()
.build();
}
@PostConstruct
public void init() throws SchedulerException {
// JobEnvironmentStatusChecker
createCronScheduleJob(JobEnvironmentStatusChecker(), environment.getProperty("asw.cron.JobEnvironmentStatusChecker", "0 0/1 * * * ? *"));
createCronScheduleJob(JobPlaybookExecutor(), environment.getProperty("asw.cron.JobPlaybookExecutor", "0 0/1 * * * ? *"));
createCronScheduleJob(JobPlaybookExecResultChecker(), environment.getProperty("asw.cron.JobPlaybookExecResultChecker", "0/30 * * * * ?"));
}
/**

View File

@@ -110,4 +110,9 @@ public class Constants {
public static final List<String> ANDROID_PACKAGE_TYPE_LIST = T.ListUtil.of("xapk", "apk");
public static final String EMPTY_FILE_MD5 = "d41d8cd98f00b204e9800998ecf8427e";
/**
* tid -> jobId 用于获取 job 运行结果
*/
public static final Map<String, String> PLAYBOOK_EXECUTOR_RESULT = T.MapUtil.newHashMap();
}

View File

@@ -0,0 +1,133 @@
package net.geedge.asw.module.runner.job;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import net.geedge.asw.common.util.Constants;
import net.geedge.asw.common.util.RCode;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.environment.entity.EnvironmentEntity;
import net.geedge.asw.module.environment.service.IEnvironmentService;
import net.geedge.asw.module.runner.entity.JobEntity;
import net.geedge.asw.module.runner.entity.PcapEntity;
import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.service.IPcapService;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.apache.commons.lang3.time.StopWatch;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.io.File;
import java.util.Map;
import java.util.Set;
@DisallowConcurrentExecution
public class JobPlaybookExecResultChecker extends QuartzJobBean {
private static final Log log = Log.get();
@Autowired
private IEnvironmentService environmentService;
@Autowired
private IPcapService pcapService;
@Autowired
private IJobService jobService;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
Thread.currentThread().setName("JobPlaybookExecResultChecker");
log.info("[JobPlaybookExecResultChecker] [begin]");
StopWatch sw = new StopWatch();
sw.start();
try {
this.playbookExecResultChecker();
} catch (Exception e) {
log.error(e, "[JobPlaybookExecResultChecker] [error]");
} finally {
sw.stop();
}
log.info("[JobPlaybookExecResultChecker] [finshed] [Run Time: {}]", sw.toString());
}
private void playbookExecResultChecker() {
Set<Map.Entry<String, String>> entryList = Constants.PLAYBOOK_EXECUTOR_RESULT.entrySet();
if (entryList.isEmpty()) {
return;
}
for (Map.Entry<String, String> entry : entryList) {
Thread.ofVirtual().start(() -> {
String tid = entry.getKey();
String jobId = entry.getValue();
JobEntity job = jobService.getById(jobId);
EnvironmentEntity environment = environmentService.getById(job.getEnvId());
log.info("[playbookExecResultChecker] [tid: {}] [jobId: {}] [envId]", tid, jobId, environment.getId());
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, tid));
request.header("Authorization", token);
HttpResponse response = request.execute();
log.info("[playbookExecResultChecker] [env: {}] [status: {}]", environment.getId(), response.getStatus());
File destination = null;
if (response.isOk()) {
// file
if (MediaType.APPLICATION_OCTET_STREAM_VALUE.equals(response.header(Header.CONTENT_TYPE.getValue()))) {
String fileName = response.header(Header.CONTENT_DISPOSITION).split("filename=")[1];
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [env: {}] [result fileName: {}]", environment.getId(), fileName);
}
destination = T.FileUtil.file(Constants.TEMP_PATH, fileName);
T.FileUtil.writeBytes(response.bodyBytes(), destination);
Resource fileResource = new FileSystemResource(destination);
// upload pcap file
PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId());
job.setPcapId(pcapEntity.getId());
job.setStatus(RunnerConstant.JobStatus.PASSED.getValue());
job.setEndTimestamp(System.currentTimeMillis());
job.setUpdateTimestamp(System.currentTimeMillis());
jobService.updateById(job);
Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid);
} else {
String result = response.body();
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [env: {}] [result: {}]", environment.getId(), result);
}
JSONObject jsonObject = T.JSONUtil.parseObj(result);
if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) {
JSONObject data = jsonObject.getJSONObject("data");
String status = data.getStr("status");
if (!RunnerConstant.JobStatus.RUNNING.getValue().equals(status)) {
job.setStatus(RunnerConstant.JobStatus.FAILED.getValue());
job.setUpdateTimestamp(System.currentTimeMillis());
jobService.updateById(job);
Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid);
}
}
}
}
if (destination != null) {
T.FileUtil.del(destination);
}
});
}
}
}

View File

@@ -0,0 +1,143 @@
package net.geedge.asw.module.runner.job;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import net.geedge.asw.common.util.Constants;
import net.geedge.asw.common.util.RCode;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.app.entity.PackageEntity;
import net.geedge.asw.module.app.service.IPackageService;
import net.geedge.asw.module.environment.entity.EnvironmentEntity;
import net.geedge.asw.module.environment.service.IEnvironmentService;
import net.geedge.asw.module.runner.entity.JobEntity;
import net.geedge.asw.module.runner.entity.PlaybookEntity;
import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.service.IPlaybookService;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.apache.commons.lang3.time.StopWatch;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@DisallowConcurrentExecution
public class JobPlaybookExecutor extends QuartzJobBean {
private static final Log log = Log.get();
@Autowired
private IJobService jobService;
@Autowired
private IEnvironmentService environmentService;
@Autowired
private IPackageService packageService;
@Autowired
private IPlaybookService playbookService;
@Override
protected void executeInternal(JobExecutionContext context) {
Thread.currentThread().setName("JobPlaybookExecutor");
log.info("[JobPlaybookExecutor] [begin]");
StopWatch sw = new StopWatch();
sw.start();
try {
this.playbookExecutor();
} catch (Exception e) {
log.error(e, "[JobPlaybookExecutor] [error]");
} finally {
sw.stop();
}
log.info("[JobPlaybookExecutor] [finshed] [Run Time: {}]", sw.toString());
}
@Transactional(rollbackFor = Exception.class)
public void playbookExecutor() {
List<JobEntity> list = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, "create"));
Map<String, List<JobEntity>> jobByEnvList = list.stream().collect(Collectors.groupingBy(JobEntity::getEnvId));
for (Map.Entry<String, List<JobEntity>> jobByEnv : jobByEnvList.entrySet()) {
String envId = jobByEnv.getKey();
List<JobEntity> jobList = jobByEnv.getValue();
Thread.ofVirtual().start(() -> {
for (JobEntity job : jobList) {
List<JobEntity> JobRunList = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, "running").eq(JobEntity::getEnvId, envId));
if (T.CollUtil.isNotEmpty(JobRunList)) {
continue;
}
EnvironmentEntity environment = environmentService.getById(envId);
if (!environment.getStatus().equals(1)) {
if (log.isDebugEnabled()) {
log.debug("[playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", job.getId(), environment.getId());
}
continue;
}
String result = null;
String playbookId = job.getPlaybookId();
String packageId = job.getPackageId();
PackageEntity packageEntity = packageService.getById(packageId);
File packageFile = T.FileUtil.file(packageEntity.getPath());
String packageName = packageEntity.getIdentifier();
PlaybookEntity playbook = playbookService.getById(playbookId);
File playbookFile = T.FileUtil.file(playbook.getPath());
log.info("[playbookExecutor] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId);
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest request = T.HttpUtil.createPost(String.format("%s/api/v1/env/playbook", url));
request.form("files", packageFile, playbookFile);
request.form("packageName", packageName);
request.header("Authorization", token);
HttpResponse response = request.execute();
log.info("[playbookExecutor] [env] [status: {}]", environment.getId(), response.getStatus());
if (response.isOk()) {
result = response.body();
}
if (log.isDebugEnabled()) {
log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result);
}
if (T.StrUtil.isNotEmpty(result)) {
try {
JSONObject jsonObject = T.JSONUtil.parseObj(result);
if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) {
JSONObject data = jsonObject.getJSONObject("data");
String tid = data.getStr("tid");
Constants.PLAYBOOK_EXECUTOR_RESULT.put(tid, job.getId());
}
} catch (Exception e) {
log.error(e, "[playbookExecutor] [parse result error] [result: {}]", job.getId(), result);
}
}
// update job status, starTime, updateTimestamp
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
.set(JobEntity::getUpdateTimestamp, System.currentTimeMillis())
.set(JobEntity::getStartTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId())
);
}
});
}
}
}

View File

@@ -125,8 +125,10 @@ public class PcapServiceImpl extends ServiceImpl<PcapDao, PcapEntity> implements
public PcapEntity savePcap(Resource fileResource, String... params) {
String description = T.ArrayUtil.get(params, 0);
String workspaceId = T.ArrayUtil.get(params, 1);
String createUserId = T.StrUtil.emptyToDefault(T.ArrayUtil.get(params, 3), StpUtil.getLoginIdAsString());
String createUserId = T.ArrayUtil.get(params, 2);
if (T.StrUtil.isEmpty(createUserId)){
createUserId = StpUtil.getLoginIdAsString();
}
PcapEntity entity = new PcapEntity();
try {
String pcapId = T.StrUtil.uuid();