fix: 调整 job 执行流程

This commit is contained in:
zhangshuai
2024-11-20 16:47:19 +08:00
parent 0133dc72b2
commit 70feac12fc
4 changed files with 296 additions and 85 deletions

View File

@@ -0,0 +1,109 @@
package net.geedge.asw.common.config;
import cn.hutool.core.io.FileUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.Method;
import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import net.geedge.asw.common.util.Constants;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.environment.entity.EnvironmentEntity;
import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity;
import net.geedge.asw.module.environment.service.IEnvironmentService;
import net.geedge.asw.module.environment.service.IEnvironmentSessionService;
import net.geedge.asw.module.runner.entity.JobEntity;
import net.geedge.asw.module.runner.util.JobQueueManager;
import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* setup初始化操作
*/
@Component
public class SetupRunner implements CommandLineRunner{
private static final Log log = Log.get();
@Autowired
private IJobService jobService;
@Autowired
private JobQueueManager jobQueueManager;
@Autowired
private IEnvironmentService environmentService;
@Autowired
private IEnvironmentSessionService environmentSessionService;
@Override
public void run(String... args) throws Exception {
log.info("Setup inited");
List<JobEntity> pendingJobs = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, RunnerConstant.JobStatus.PENDING.getValue()));
pendingJobs.forEach(jobQueueManager::addJob);
log.info("[SetupRunner] [init pending job to JobQueueManager]");
log.info("[SetupRunner] [begin interrupted running job]");
List<JobEntity> runningJobs = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()));
for (JobEntity runningJob : runningJobs) {
String id = runningJob.getId();
EnvironmentEntity environment = environmentService.getById(runningJob.getEnvId());
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
HttpRequest requestStatus = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, runningJob.getId()));
requestStatus.header("Authorization", token);
HttpResponse response = requestStatus.execute();
if (response.isOk()){
String body = response.body();
JSONObject result = T.JSONUtil.toBean(body, JSONObject.class);
JSONObject data = result.getJSONObject("data");
String status = data.getStr("status");
if (RunnerConstant.JobStatus.RUNNING.getValue().equals(status)){
HttpRequest request = T.HttpUtil.createRequest(Method.DELETE, String.format("%s/api/v1/env/playbook/%s", url, runningJob.getId()));
request.header("Authorization", token);
request.execute();
}
}
Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id);
if (runningThread != null) {
runningThread.interrupt();
}
Thread resultThread = Constants.RESULT_JOB_THREAD.get(id);
if (resultThread != null) {
resultThread.interrupt();
}
EnvironmentSessionEntity session = environmentSessionService.getOne(new LambdaQueryWrapper<EnvironmentSessionEntity>()
.eq(EnvironmentSessionEntity::getJobId, id)
.eq(EnvironmentSessionEntity::getStatus, 1));
if (T.ObjectUtil.isNotEmpty(session)) {
environmentService.removeSession(session.getId());
}
T.FileUtil.appendString("Job execution interrupted.", FileUtil.file(runningJob.getLogPath()), "UTF-8");
// update state
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.eq(JobEntity::getId, id)
.set(JobEntity::getStatus, "failed")
);
}
log.info("[SetupRunner] [interrupted running job end!]");
}
}

View File

@@ -17,6 +17,7 @@ import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity;
import net.geedge.asw.module.environment.service.IEnvironmentService;
import net.geedge.asw.module.environment.service.IEnvironmentSessionService;
import net.geedge.asw.module.runner.entity.JobEntity;
import net.geedge.asw.module.runner.util.JobQueueManager;
import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,6 +42,9 @@ public class JobController {
@Autowired
private IEnvironmentSessionService sessionService;
@Autowired
private JobQueueManager jobQueueManager;
@GetMapping("/{workspaceId}/job/{id}")
public R detail(@PathVariable("workspaceId") String workspaceId,
@PathVariable("id") String id) {
@@ -117,6 +121,10 @@ public class JobController {
log.info("[cancelJob] [request env stop playbook] [status: {}]", response.body());
}
if (job.getStatus().contains(RunnerConstant.JobStatus.PENDING.getValue())){
jobQueueManager.requeueJob(job);
}
Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id);
if (runningThread != null) {
runningThread.interrupt();

View File

@@ -7,7 +7,6 @@ import cn.hutool.log.Log;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import net.geedge.asw.common.util.Constants;
import net.geedge.asw.common.util.RCode;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.app.entity.PackageEntity;
import net.geedge.asw.module.app.service.IPackageService;
@@ -19,6 +18,7 @@ import net.geedge.asw.module.runner.entity.JobEntity;
import net.geedge.asw.module.runner.entity.PlaybookEntity;
import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.service.IPlaybookService;
import net.geedge.asw.module.runner.util.JobQueueManager;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.apache.commons.lang3.time.StopWatch;
import org.quartz.DisallowConcurrentExecution;
@@ -30,10 +30,9 @@ import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@DisallowConcurrentExecution
@SuppressWarnings("all")
public class JobPlaybookExecutor extends QuartzJobBean {
private static final Log log = Log.get();
@@ -53,6 +52,9 @@ public class JobPlaybookExecutor extends QuartzJobBean {
@Autowired
private IEnvironmentSessionService environmentSessionService;
@Autowired
private JobQueueManager jobQueueManager;
@Override
protected void executeInternal(JobExecutionContext context) {
Thread.currentThread().setName("JobPlaybookExecutor");
@@ -72,121 +74,152 @@ public class JobPlaybookExecutor extends QuartzJobBean {
@Transactional(rollbackFor = Exception.class)
public void playbookExecutor() {
List<JobEntity> createdList = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, RunnerConstant.JobStatus.CREATED.getValue()));
Map<String, List<JobEntity>> jobByEnvList = createdList.stream().collect(Collectors.groupingBy(JobEntity::getEnvId));
for (Map.Entry<String, List<JobEntity>> jobByEnv : jobByEnvList.entrySet()) {
String envId = jobByEnv.getKey();
List<JobEntity> jobList = jobByEnv.getValue();
T.ThreadUtil.execAsync(() -> {
for (JobEntity job : jobList) {
List<JobEntity> JobRunList = jobService.list(new LambdaQueryWrapper<JobEntity>()
.eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
.eq(JobEntity::getEnvId, envId));
if (T.CollUtil.isNotEmpty(JobRunList)) {
continue;
}
List<JobEntity> createdJobs = jobService.list(
new LambdaQueryWrapper<JobEntity>()
.eq(JobEntity::getStatus, RunnerConstant.JobStatus.CREATED.getValue())
.orderByAsc(JobEntity::getCreateTimestamp)
);
EnvironmentEntity environment = environmentService.getById(envId);
if (!environment.getStatus().equals(1)) {
if (log.isDebugEnabled()) {
log.debug("[playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", job.getId(), environment.getId());
}
continue;
}
if (T.CollUtil.isNotEmpty(createdJobs)) {
log.info("[JobPlaybookExecutor] [playbookExecutor] [fetching created jobs] [size: {}]", createdJobs.size());
// 将 CREATED 任务加入队列
createdJobs.forEach(jobQueueManager::addJob);
// 更新 createdJobs 状态为 pending
createdJobs.forEach(x -> x.setStatus(RunnerConstant.JobStatus.PENDING.getValue()));
jobService.updateBatchById(createdJobs);
}
List<EnvironmentSessionEntity> sessionList = environmentSessionService.list(new LambdaQueryWrapper<EnvironmentSessionEntity>()
.eq(EnvironmentSessionEntity::getStatus, "1")
.eq(EnvironmentSessionEntity::getEnvId, envId));
if (T.CollUtil.isNotEmpty(sessionList)) {
if (log.isDebugEnabled()) {
log.debug("[playbookExecutor] [environment is in used] [jobId: {}] [envId: {}]", job.getId(), environment.getId());
}
continue;
}
// 处理队列中的任务
if (!jobQueueManager.isAllQueuesEmpty()) {
List<JobEntity> nextJobList = jobQueueManager.fetchNextJob();
for (JobEntity nextJob : nextJobList) {
String envId = nextJob.getEnvId();
log.info("[JobPlaybookExecutor] [playbookExecutor] [Processing jobId: {}] [envId: {}]", nextJob.getId(), envId);
// update job status running
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
.set(JobEntity::getStartTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId())
);
// add session
EnvironmentSessionEntity session = new EnvironmentSessionEntity();
session.setEnvId(envId);
session.setJobId(job.getId());
session.setStatus(1);
session.setUserId("system");
session.setWorkspaceId(job.getWorkspaceId());
session.setStartTimestamp(System.currentTimeMillis());
environmentSessionService.save(session);
HttpResponse response = requestEnvPlaybook(job, environment);
log.info("[playbookExecutor] [job id: {}] [env: {}] [status: {}]", job.getId(), environment.getId(), response.getStatus());
if (!response.isOk()) {
String result = response.body();
if (log.isDebugEnabled()) {
log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result);
}
File logFile = T.FileUtil.file(job.getLogPath());
T.FileUtil.appendString(String.format("ERROR: Request %s environment error \n", environment.getId()), logFile, "UTF-8");
T.FileUtil.appendString(String.format("Result: %s", result), logFile, "UTF-8");
// update job status, starTime, updateTimestamp
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue())
.set(JobEntity::getEndTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId()));
// remove session
environmentService.removeSession(session.getId());
log.info("[playbookExecutor] [request env exec playbook error] [job id: {}]", job.getId());
}
EnvironmentEntity environment = environmentService.getById(envId);
if (!environment.getStatus().equals(1)) {
log.warn("[JobPlaybookExecutor] [playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", nextJob.getId(), environment.getId());
jobQueueManager.requeueJob(nextJob); // 将任务放回队列
continue;
}
});
List<EnvironmentSessionEntity> sessionList = environmentSessionService.list(new LambdaQueryWrapper<EnvironmentSessionEntity>()
.eq(EnvironmentSessionEntity::getStatus, "1")
.eq(EnvironmentSessionEntity::getEnvId, envId));
if (T.CollUtil.isNotEmpty(sessionList)) {
log.warn("[JobPlaybookExecutor] [playbookExecutor] [environment is in used] [jobId: {}] [envId: {}]", nextJob.getId(), environment.getId());
jobQueueManager.requeueJob(nextJob); // 将任务放回队列
continue;
}
// update job status running
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
.set(JobEntity::getStartTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, nextJob.getId())
);
// add session
EnvironmentSessionEntity session = new EnvironmentSessionEntity();
session.setEnvId(envId);
session.setJobId(nextJob.getId());
session.setStatus(1);
session.setUserId("system");
session.setWorkspaceId(nextJob.getWorkspaceId());
session.setStartTimestamp(System.currentTimeMillis());
environmentSessionService.save(session);
// 执行任务
processJobAsync(nextJob, environment, session);
}
}
}
private HttpResponse requestEnvPlaybook(JobEntity job, EnvironmentEntity environment) {
private void processJobAsync(JobEntity job, EnvironmentEntity environment, EnvironmentSessionEntity session) {
T.ThreadUtil.execAsync(() -> {
log.info("[JobPlaybookExecutor] [processJobAsync] [start jobId: {}]", job.getId());
try {
// 执行请求
HttpResponse response = requestEnvironment(job, environment);
if (!response.isOk()) {
String result = response.body();
log.warn("[JobPlaybookExecutor] [processJobAsync] [envId: {}] [result: {}]", environment.getId(), result);
File logFile = T.FileUtil.file(job.getLogPath());
T.FileUtil.appendString(String.format("ERROR: Request %s environment error! msg: %s.\n", environment.getName(), result), logFile, "UTF-8");
// update job status, starTime, updateTimestamp
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue())
.set(JobEntity::getEndTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId()));
// remove session
environmentService.removeSession(session.getId());
}
log.info("[JobPlaybookExecutor] [processJobAsync] [Finished jobId: {}]", job.getId());
} catch (Exception e) {
// update job status, starTime, updateTimestamp
jobService.update(new LambdaUpdateWrapper<JobEntity>()
.set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue())
.set(JobEntity::getEndTimestamp, System.currentTimeMillis())
.eq(JobEntity::getId, job.getId()));
// remove session
environmentService.removeSession(session.getId());
throw new RuntimeException(e);
}
});
}
private HttpResponse requestEnvironment(JobEntity job, EnvironmentEntity environment) {
File zipFile = null;
try {
String playbookId = job.getPlaybookId();
String packageId = job.getPackageId();
// package playbook file
PackageEntity packageEntity = packageService.getById(packageId);
File packageFile = T.FileUtil.file(packageEntity.getPath());
PlaybookEntity playbook = playbookService.getById(playbookId);
File playbookFile = T.FileUtil.file(playbook.getPath());
// zip
zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
T.ZipUtil.zip(zipFile, true, packageFile, playbookFile);
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
// parameters
String packageName = packageEntity.getIdentifier();
String parameters = job.getParameters();
Map<String, Object> params = T.MapUtil.newHashMap();
if (T.StrUtil.isNotEmpty(parameters)) {
params = T.JSONUtil.toBean(parameters, Map.class);
}else {
} else {
params.put("reInstall", true);
params.put("clearCache", true);
params.put("unInstall", true);
}
PlaybookEntity playbook = playbookService.getById(playbookId);
File playbookFile = T.FileUtil.file(playbook.getPath());
log.info("[playbookExecutor] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId);
JSONObject paramJSONObject = environment.getParamJSONObject();
String url = paramJSONObject.getStr("url");
String token = paramJSONObject.getStr("token");
zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
// build request
log.info("[JobPlaybookExecutor] [requestEnvironment] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId);
HttpRequest request = T.HttpUtil.createPost(String.format("%s/api/v1/env/playbook", url));
T.ZipUtil.zip(zipFile, true, packageFile, playbookFile);
request.header("Authorization", token);
request.form("file", zipFile);
request.form("id", job.getId());
request.form("packageName", packageName);
request.header("Authorization", token);
for (Map.Entry<String, Object> param : params.entrySet()) {
request.form(param.getKey(), param.getValue());
}
HttpResponse response = request.execute();
return response;
} catch (Exception e) {
log.error("[JobPlaybookExecutor] [requestEnvironment] [error] [jobId: {}]", job.getId(), e);
throw new RuntimeException(e);
} finally {
T.FileUtil.del(zipFile);
}

View File

@@ -0,0 +1,61 @@
package net.geedge.asw.module.runner.util;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.runner.entity.JobEntity;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* job 任务队列管理
*/
@Component
public class JobQueueManager {
// 每个环境对应一个任务队列,以创建时间排序
private final Map<String, PriorityQueue<JobEntity>> groupedQueues = new ConcurrentHashMap<>();
// 添加任务到队列
public synchronized void addJob(JobEntity job) {
boolean contains = groupedQueues.containsKey(job.getEnvId());
if (!contains) {
PriorityQueue<JobEntity> queue = new PriorityQueue<>(Comparator.comparing(JobEntity::getCreateTimestamp));
queue.offer(job);
groupedQueues.put(job.getEnvId(), queue);
} else {
groupedQueues.get(job.getEnvId()).offer(job);
}
}
// 获取每个环境下的任务
public synchronized List<JobEntity> fetchNextJob() {
List<JobEntity> list = T.ListUtil.list(false);
for (Map.Entry<String, PriorityQueue<JobEntity>> queueEntry : groupedQueues.entrySet()) {
PriorityQueue<JobEntity> queue = queueEntry.getValue();
if (queue == null || queue.isEmpty()) {
continue; // 如果该环境下没有任务则跳过
}
list.add(queue.poll());
}
return list;
}
// 检查所有队列是否为空
public synchronized boolean isAllQueuesEmpty() {
return groupedQueues.values().stream().allMatch(PriorityQueue::isEmpty);
}
// 重新添加队列
public synchronized void requeueJob(JobEntity job) {
PriorityQueue<JobEntity> queue = groupedQueues.get(job.getEnvId());
queue.offer(job);
}
// 删除
public synchronized void removeJob(JobEntity job) {
PriorityQueue<JobEntity> queue = groupedQueues.get(job.getEnvId());
queue.remove(job);
}
}