feat:ASW-129 playbook job 定时任务调整

This commit is contained in:
zhangshuai
2024-11-04 09:28:20 +08:00
parent 54bcd63e33
commit 5943b5c514
7 changed files with 249 additions and 87 deletions

View File

@@ -70,8 +70,10 @@ public class JobConfig {
public void init() throws SchedulerException {
// JobEnvironmentStatusChecker
createCronScheduleJob(JobEnvironmentStatusChecker(), environment.getProperty("asw.cron.JobEnvironmentStatusChecker", "0 0/1 * * * ? *"));
createCronScheduleJob(JobPlaybookExecutor(), environment.getProperty("asw.cron.JobPlaybookExecutor", "0 0/1 * * * ? *"));
createCronScheduleJob(JobPlaybookExecResultChecker(), environment.getProperty("asw.cron.JobPlaybookExecResultChecker", "0/30 * * * * ?"));
// JobPlaybookExecutor
createCronScheduleJob(JobPlaybookExecutor(), environment.getProperty("asw.cron.JobPlaybookExecutor", "0/30 * * * * ?"));
// JobPlaybookExecResultChecker
createCronScheduleJob(JobPlaybookExecResultChecker(), environment.getProperty("asw.cron.JobPlaybookExecResultChecker", "0/10 * * * * ?"));
}
/**

View File

@@ -111,11 +111,6 @@ public class Constants {
public static final String EMPTY_FILE_MD5 = "d41d8cd98f00b204e9800998ecf8427e";
/**
* tid -> jobId 用于获取 job 运行结果
*/
public static final Map<String, String> PLAYBOOK_EXECUTOR_RESULT = T.MapUtil.newHashMap();
/**
* 系统内置角色
*/

View File

@@ -75,4 +75,14 @@ public class JobController {
return R.ok();
}
@GetMapping("/{workspaceId}/job/{id}/log")
public R getJobLog(@PathVariable("workspaceId") String workspaceId,
@PathVariable("id") String id,
@RequestParam Integer offset) {
offset = offset == null ? 0 : offset;
Map result = jobService.queryJobLog(id, offset);
return R.ok().putData("record", result);
}
}

View File

@@ -5,6 +5,8 @@ import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.PreDestroy;
import net.geedge.asw.common.util.Constants;
import net.geedge.asw.common.util.RCode;
import net.geedge.asw.common.util.T;
@@ -24,12 +26,15 @@ import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
@DisallowConcurrentExecution
@@ -49,6 +54,10 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
@Autowired
private IPackageService packageService;
// 用于追踪虚拟线程,每个 Job 只对应一个虚拟线程
private final Map<String, Thread> runningJobThreads = new ConcurrentHashMap<>();
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
@@ -68,75 +77,186 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
}
private void playbookExecResultChecker() {
Set<Map.Entry<String, String>> entryList = Constants.PLAYBOOK_EXECUTOR_RESULT.entrySet();
if (entryList.isEmpty()) {
List<JobEntity> jobList = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()));
if (jobList.isEmpty()) {
return;
}
for (Map.Entry<String, String> entry : entryList) {
for (JobEntity job : jobList) {
Thread.ofVirtual().start(() -> {
String tid = entry.getKey();
String jobId = entry.getValue();
JobEntity job = jobService.getById(jobId);
String id = job.getId();
EnvironmentEntity environment = environmentService.getById(job.getEnvId());
log.info("[playbookExecResultChecker] [tid: {}] [jobId: {}] [envId]", tid, jobId, environment.getId());
log.info("[playbookExecResultChecker] [jobId: {}] [envId]", id, environment.getId());
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, tid));
HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, id));
request.header("Authorization", token);
HttpResponse response = request.execute();
log.info("[playbookExecResultChecker] [env: {}] [status: {}]", environment.getId(), response.getStatus());
log.info("[playbookExecResultChecker] [request env api] [env: {}] [status: {}]", environment.getId(), response.getStatus());
File destination = null;
if (response.isOk()) {
// file
if (MediaType.APPLICATION_OCTET_STREAM_VALUE.equals(response.header(Header.CONTENT_TYPE.getValue()))) {
String result = response.body();
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [env: {}] [result: {}]", environment.getId(), result);
}
JSONObject jsonObject = T.JSONUtil.parseObj(result);
if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) {
JSONObject data = jsonObject.getJSONObject("data");
String status = data.getStr("status");
PackageEntity packageEntity = packageService.getById(job.getPackageId());
// pcap name {package.name}-job-{jobId[0:8]}.pcap
String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED , T.StrUtil.sub(jobId, 0, 8), ".pcap");
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [env: {}] [result fileName: {}]", environment.getId(), fileName);
}
destination = T.FileUtil.file(Constants.TEMP_PATH, fileName);
T.FileUtil.writeBytes(response.bodyBytes(), destination);
Resource fileResource = new FileSystemResource(destination);
// upload pcap file
PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId());
job.setPcapId(pcapEntity.getId());
job.setStatus(RunnerConstant.JobStatus.PASSED.getValue());
job.setEndTimestamp(System.currentTimeMillis());
job.setUpdateTimestamp(System.currentTimeMillis());
jobService.updateById(job);
Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid);
} else {
String result = response.body();
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [env: {}] [result: {}]", environment.getId(), result);
}
JSONObject jsonObject = T.JSONUtil.parseObj(result);
if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) {
JSONObject data = jsonObject.getJSONObject("data");
String status = data.getStr("status");
if (!RunnerConstant.JobStatus.RUNNING.getValue().equals(status)) {
job.setStatus(RunnerConstant.JobStatus.FAILED.getValue());
job.setUpdateTimestamp(System.currentTimeMillis());
job.setEndTimestamp(System.currentTimeMillis());
jobService.updateById(job);
Constants.PLAYBOOK_EXECUTOR_RESULT.remove(tid);
}
switch (status){
case "running":
runningJobThreads.computeIfAbsent(job.getId(), jobId -> startJobVirtualThread(job, environment));
break;
case "error":
this.getJobResult(job, RunnerConstant.JobStatus.FAILED.getValue(), environment);
break;
case "done":
this.getJobResult(job, RunnerConstant.JobStatus.PASSED.getValue(), environment);
break;
}
}
}
if (destination != null) {
T.FileUtil.del(destination);
}
});
}
}
private Thread startJobVirtualThread(JobEntity job, EnvironmentEntity environment) {
Thread virtualThread = Thread.ofVirtual().start(() -> {
try {
while (true) {
if (isJobInRunningStatus(job)) {
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [Job status updated] [stopJobVirtualThread ] [job id: {}]", job.getId());
}
runningJobThreads.remove(job.getId());
break;
}
performJobLogic(job, environment);
Thread.sleep(2000); // 每 2 秒执行一次
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
}
});
return virtualThread;
}
// 检查 Job 的状态是否为 running
private boolean isJobInRunningStatus(JobEntity jobEntity) {
JobEntity job = jobService.getById(jobEntity.getId());
return job != null && !T.StrUtil.equalsIgnoreCase(job.getStatus(), RunnerConstant.JobStatus.RUNNING.getValue());
}
/**
* 获取 playbook 执行日志
* @param job
* @param environment
*/
private void performJobLogic(JobEntity job, EnvironmentEntity environment) {
File logFile = T.FileUtil.file(job.getLogPath());
Integer offset = 0;
if (logFile.exists()){
offset = T.FileUtil.readBytes(logFile).length;
}
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/log", url, job.getId()));
request.form("offset", offset);
request.header("Authorization", token);
HttpResponse response = request.execute();
if (response.isOk()) {
String result = response.body();
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [performJobLogic] [env: {}] [result: {}]", environment.getId(), result);
}
JSONObject jsonObject = T.JSONUtil.parseObj(result);
if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) {
JSONObject data = jsonObject.getJSONObject("data");
String content = data.getStr("content");
T.FileUtil.appendString(content, logFile, "UTF-8");
}
}
}
@PreDestroy
public void shutdown() {
runningJobThreads.values().forEach(Thread::interrupt);
runningJobThreads.clear();
}
/**
* get pcap log
* @param job
* @param value job status: error and done
* @param environment
*/
private void getJobResult(JobEntity job, String value, EnvironmentEntity environment) {
log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [status: {}]", job.getId(), value);
File destination = null;
File pcapDestination = null;
InputStream inputStream = null;
ZipFile zipFile = null;
try {
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest request = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s/artifact", url, job.getId()));
request.header("Authorization", token);
HttpResponse response = request.execute();
log.info("[playbookExecResultChecker] [getJobResult] [jod id: {}] [request env api] [status: {}]", job.getId(), response.getStatus());
if (response.isOk()) {
destination = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
T.FileUtil.writeBytes(response.bodyBytes(), destination);
zipFile = T.ZipUtil.toZipFile(destination, T.CharsetUtil.CHARSET_UTF_8);
List<? extends ZipEntry> list = zipFile.stream().toList();
for (ZipEntry entry : list) {
if (entry.getName().endsWith("pcap")) {
PackageEntity packageEntity = packageService.getById(job.getPackageId());
// pcap name {package.name}-job-{jobId[0:8]}.pcap
String fileName = T.StrUtil.concat(true, packageEntity.getName(), T.StrUtil.DASHED, "job", T.StrUtil.DASHED, T.StrUtil.sub(job.getId(), 0, 8), ".pcap");
pcapDestination = T.FileUtil.file(Constants.TEMP_PATH, fileName);
inputStream = T.ZipUtil.get(zipFile, entry.getName());
T.FileUtil.writeFromStream(inputStream, pcapDestination);
Resource fileResource = new FileSystemResource(pcapDestination);
// upload pcap file
PcapEntity pcapEntity = pcapService.savePcap(fileResource, "", job.getWorkspaceId(), job.getCreateUserId());
if (log.isDebugEnabled()) {
log.debug("[playbookExecResultChecker] [getJobResult] [job id: {}]: {}] [upload pcap: {}]", job.getId(), T.JSONUtil.toJsonStr(pcapEntity));
}
job.setPcapId(pcapEntity.getId());
}else {
// log
inputStream = T.ZipUtil.get(zipFile, entry.getName());
File logFile = T.FileUtil.file(job.getLogPath());
T.FileUtil.writeFromStream(inputStream, logFile);
}
}
job.setStatus(value);
job.setUpdateTimestamp(System.currentTimeMillis());
jobService.updateById(job);
}
} catch (Exception e) {
log.error("[playbookExecResultChecker] [getJobResult] [error]", e);
}finally {
T.IoUtil.close(zipFile);
T.FileUtil.del(destination);
T.FileUtil.del(pcapDestination);
T.IoUtil.close(inputStream);
}
}
}

View File

@@ -86,7 +86,6 @@ public class JobPlaybookExecutor extends QuartzJobBean {
continue;
}
String result = null;
String playbookId = job.getPlaybookId();
String packageId = job.getPackageId();
PackageEntity packageEntity = packageService.getById(packageId);
@@ -101,41 +100,42 @@ public class JobPlaybookExecutor extends QuartzJobBean {
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
File zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
HttpRequest request = T.HttpUtil.createPost(String.format("%s/api/v1/env/playbook", url));
request.form("files", packageFile, playbookFile);
T.ZipUtil.zip(zipFile, true, packageFile, playbookFile);
request.form("file", zipFile);
request.form("id", job.getId());
request.form("packageName", packageName);
request.header("Authorization", token);
HttpResponse response = request.execute();
log.info("[playbookExecutor] [env] [status: {}]", environment.getId(), response.getStatus());
log.info("[playbookExecutor] [env: {}] [status: {}]", environment.getId(), response.getStatus());
if (response.isOk()) {
result = response.body();
}
// update job status, starTime, updateTimestamp
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
.set(JobEntity::getStartTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId())
);
if (log.isDebugEnabled()) {
log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result);
}
if (T.StrUtil.isNotEmpty(result)) {
try {
JSONObject jsonObject = T.JSONUtil.parseObj(result);
if (T.ObjectUtil.equal(RCode.SUCCESS.getCode(), jsonObject.getInt("code"))) {
JSONObject data = jsonObject.getJSONObject("data");
String tid = data.getStr("tid");
Constants.PLAYBOOK_EXECUTOR_RESULT.put(tid, job.getId());
}
} catch (Exception e) {
log.error(e, "[playbookExecutor] [parse result error] [result: {}]", job.getId(), result);
}else {
String result = response.body();
if (log.isDebugEnabled()) {
log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result);
}
}
// update job status, starTime, updateTimestamp
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
.set(JobEntity::getUpdateTimestamp, System.currentTimeMillis())
.set(JobEntity::getStartTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId())
);
File logFile = T.FileUtil.file(job.getLogPath());
T.FileUtil.appendString(String.format("ERROR: Request %s environment error \n", environment.getId()), logFile, "UTF-8");
T.FileUtil.appendString(String.format("Result: %s", result), logFile, "UTF-8");
// update job status, starTime, updateTimestamp
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue())
.set(JobEntity::getStartTimestamp, System.currentTimeMillis())
.set(JobEntity::getEndTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId()));
}
}
});
}

View File

@@ -17,6 +17,8 @@ public interface IJobService extends IService<JobEntity>{
void removeJob(List<String> ids);
Map queryJobLog(String id, Integer offset);
// JobEntity assignPendingJob(String id, String platform);
//
// void appendTraceLogStrToFile(String jobId, String content) throws RuntimeException;

View File

@@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import net.geedge.asw.common.config.Query;
import net.geedge.asw.common.util.ASWException;
import net.geedge.asw.common.util.RCode;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.app.entity.PackageEntity;
@@ -19,10 +20,13 @@ import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.service.IPlaybookService;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.web.servlet.MultipartAutoConfiguration;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.List;
import java.util.Map;
@@ -117,6 +121,35 @@ public class JobServiceImpl extends ServiceImpl<JobDao, JobEntity> implements IJ
this.removeBatchByIds(ids);
}
@Override
public Map queryJobLog(String id, Integer offset) {
JobEntity job = this.getById(id);
Map result = T.MapUtil.newHashMap();
File logFile = T.FileUtil.file(job.getLogPath());
if (logFile.exists()){
try (RandomAccessFile raf = new RandomAccessFile(logFile, "r")) {
raf.seek(offset);
byte[] bytes = new byte[(int)raf.length() - offset];
raf.readFully(bytes);
String content = new String(bytes);
result.put("content", content);
result.put("length", bytes.length);
result.put("offset", offset + bytes.length);
} catch (IOException e) {
log.error("queryJobLog error", e);
throw new ASWException(RCode.ERROR);
}
}else {
result.put("content", T.StrUtil.EMPTY);
result.put("length", 0);
result.put("offset", 0);
}
return result;
}
// @Override
// public synchronized JobEntity assignPendingJob(String runnerId, String platform) {
// if (T.StrUtil.hasEmpty(runnerId, platform)) {