diff --git a/src/main/java/net/geedge/asw/common/config/SetupRunner.java b/src/main/java/net/geedge/asw/common/config/SetupRunner.java new file mode 100644 index 0000000..116566d --- /dev/null +++ b/src/main/java/net/geedge/asw/common/config/SetupRunner.java @@ -0,0 +1,109 @@ +package net.geedge.asw.common.config; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.Method; +import cn.hutool.json.JSONObject; +import cn.hutool.log.Log; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import net.geedge.asw.common.util.Constants; +import net.geedge.asw.common.util.T; +import net.geedge.asw.module.environment.entity.EnvironmentEntity; +import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity; +import net.geedge.asw.module.environment.service.IEnvironmentService; +import net.geedge.asw.module.environment.service.IEnvironmentSessionService; +import net.geedge.asw.module.runner.entity.JobEntity; +import net.geedge.asw.module.runner.util.JobQueueManager; +import net.geedge.asw.module.runner.service.IJobService; +import net.geedge.asw.module.runner.util.RunnerConstant; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import java.util.List; + + +/** + * setup初始化操作 + */ +@Component +public class SetupRunner implements CommandLineRunner{ + private static final Log log = Log.get(); + + @Autowired + private IJobService jobService; + + @Autowired + private JobQueueManager jobQueueManager; + + @Autowired + private IEnvironmentService environmentService; + + @Autowired + private IEnvironmentSessionService environmentSessionService; + + @Override + public void run(String... args) throws Exception { + log.info("Setup inited"); + List pendingJobs = jobService.list(new LambdaQueryWrapper().eq(JobEntity::getStatus, RunnerConstant.JobStatus.PENDING.getValue())); + pendingJobs.forEach(jobQueueManager::addJob); + log.info("[SetupRunner] [init pending job to JobQueueManager]"); + + + log.info("[SetupRunner] [begin interrupted running job]"); + List runningJobs = jobService.list(new LambdaQueryWrapper().eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())); + for (JobEntity runningJob : runningJobs) { + String id = runningJob.getId(); + EnvironmentEntity environment = environmentService.getById(runningJob.getEnvId()); + + JSONObject paramJSONObject = environment.getParamJSONObject(); + String url = paramJSONObject.getStr("url"); + String token = paramJSONObject.getStr("token"); + + HttpRequest requestStatus = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, runningJob.getId())); + requestStatus.header("Authorization", token); + + HttpResponse response = requestStatus.execute(); + if (response.isOk()){ + String body = response.body(); + JSONObject result = T.JSONUtil.toBean(body, JSONObject.class); + JSONObject data = result.getJSONObject("data"); + String status = data.getStr("status"); + if (RunnerConstant.JobStatus.RUNNING.getValue().equals(status)){ + HttpRequest request = T.HttpUtil.createRequest(Method.DELETE, String.format("%s/api/v1/env/playbook/%s", url, runningJob.getId())); + request.header("Authorization", token); + request.execute(); + } + } + + Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id); + if (runningThread != null) { + runningThread.interrupt(); + } + + Thread resultThread = Constants.RESULT_JOB_THREAD.get(id); + if (resultThread != null) { + resultThread.interrupt(); + } + EnvironmentSessionEntity session = environmentSessionService.getOne(new LambdaQueryWrapper() + .eq(EnvironmentSessionEntity::getJobId, id) + .eq(EnvironmentSessionEntity::getStatus, 1)); + + if (T.ObjectUtil.isNotEmpty(session)) { + environmentService.removeSession(session.getId()); + } + + T.FileUtil.appendString("Job execution interrupted.", FileUtil.file(runningJob.getLogPath()), "UTF-8"); + + // update state + jobService.update(new LambdaUpdateWrapper() + .eq(JobEntity::getId, id) + .set(JobEntity::getStatus, "failed") + ); + } + + log.info("[SetupRunner] [interrupted running job end!]"); + } +} diff --git a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java index 461e1b9..2e13f42 100644 --- a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java +++ b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java @@ -17,6 +17,7 @@ import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity; import net.geedge.asw.module.environment.service.IEnvironmentService; import net.geedge.asw.module.environment.service.IEnvironmentSessionService; import net.geedge.asw.module.runner.entity.JobEntity; +import net.geedge.asw.module.runner.util.JobQueueManager; import net.geedge.asw.module.runner.service.IJobService; import net.geedge.asw.module.runner.util.RunnerConstant; import org.springframework.beans.factory.annotation.Autowired; @@ -41,6 +42,9 @@ public class JobController { @Autowired private IEnvironmentSessionService sessionService; + @Autowired + private JobQueueManager jobQueueManager; + @GetMapping("/{workspaceId}/job/{id}") public R detail(@PathVariable("workspaceId") String workspaceId, @PathVariable("id") String id) { @@ -117,6 +121,10 @@ public class JobController { log.info("[cancelJob] [request env stop playbook] [status: {}]", response.body()); } + if (job.getStatus().contains(RunnerConstant.JobStatus.PENDING.getValue())){ + jobQueueManager.requeueJob(job); + } + Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id); if (runningThread != null) { runningThread.interrupt(); diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java index 3e9907c..43843aa 100644 --- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java +++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java @@ -7,7 +7,6 @@ import cn.hutool.log.Log; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import net.geedge.asw.common.util.Constants; -import net.geedge.asw.common.util.RCode; import net.geedge.asw.common.util.T; import net.geedge.asw.module.app.entity.PackageEntity; import net.geedge.asw.module.app.service.IPackageService; @@ -19,6 +18,7 @@ import net.geedge.asw.module.runner.entity.JobEntity; import net.geedge.asw.module.runner.entity.PlaybookEntity; import net.geedge.asw.module.runner.service.IJobService; import net.geedge.asw.module.runner.service.IPlaybookService; +import net.geedge.asw.module.runner.util.JobQueueManager; import net.geedge.asw.module.runner.util.RunnerConstant; import org.apache.commons.lang3.time.StopWatch; import org.quartz.DisallowConcurrentExecution; @@ -30,10 +30,9 @@ import org.springframework.transaction.annotation.Transactional; import java.io.File; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; - @DisallowConcurrentExecution +@SuppressWarnings("all") public class JobPlaybookExecutor extends QuartzJobBean { private static final Log log = Log.get(); @@ -53,6 +52,9 @@ public class JobPlaybookExecutor extends QuartzJobBean { @Autowired private IEnvironmentSessionService environmentSessionService; + @Autowired + private JobQueueManager jobQueueManager; + @Override protected void executeInternal(JobExecutionContext context) { Thread.currentThread().setName("JobPlaybookExecutor"); @@ -72,121 +74,152 @@ public class JobPlaybookExecutor extends QuartzJobBean { @Transactional(rollbackFor = Exception.class) public void playbookExecutor() { - List createdList = jobService.list(new LambdaQueryWrapper().eq(JobEntity::getStatus, RunnerConstant.JobStatus.CREATED.getValue())); - Map> jobByEnvList = createdList.stream().collect(Collectors.groupingBy(JobEntity::getEnvId)); - for (Map.Entry> jobByEnv : jobByEnvList.entrySet()) { - String envId = jobByEnv.getKey(); - List jobList = jobByEnv.getValue(); - T.ThreadUtil.execAsync(() -> { - for (JobEntity job : jobList) { - List JobRunList = jobService.list(new LambdaQueryWrapper() - .eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()) - .eq(JobEntity::getEnvId, envId)); - if (T.CollUtil.isNotEmpty(JobRunList)) { - continue; - } + List createdJobs = jobService.list( + new LambdaQueryWrapper() + .eq(JobEntity::getStatus, RunnerConstant.JobStatus.CREATED.getValue()) + .orderByAsc(JobEntity::getCreateTimestamp) + ); - EnvironmentEntity environment = environmentService.getById(envId); - if (!environment.getStatus().equals(1)) { - if (log.isDebugEnabled()) { - log.debug("[playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", job.getId(), environment.getId()); - } - continue; - } + if (T.CollUtil.isNotEmpty(createdJobs)) { + log.info("[JobPlaybookExecutor] [playbookExecutor] [fetching created jobs] [size: {}]", createdJobs.size()); + // 将 CREATED 任务加入队列 + createdJobs.forEach(jobQueueManager::addJob); + // 更新 createdJobs 状态为 pending + createdJobs.forEach(x -> x.setStatus(RunnerConstant.JobStatus.PENDING.getValue())); + jobService.updateBatchById(createdJobs); + } - List sessionList = environmentSessionService.list(new LambdaQueryWrapper() - .eq(EnvironmentSessionEntity::getStatus, "1") - .eq(EnvironmentSessionEntity::getEnvId, envId)); - if (T.CollUtil.isNotEmpty(sessionList)) { - if (log.isDebugEnabled()) { - log.debug("[playbookExecutor] [environment is in used] [jobId: {}] [envId: {}]", job.getId(), environment.getId()); - } - continue; - } + // 处理队列中的任务 + if (!jobQueueManager.isAllQueuesEmpty()) { + List nextJobList = jobQueueManager.fetchNextJob(); + for (JobEntity nextJob : nextJobList) { + String envId = nextJob.getEnvId(); + log.info("[JobPlaybookExecutor] [playbookExecutor] [Processing jobId: {}] [envId: {}]", nextJob.getId(), envId); - // update job status running - jobService.update(new LambdaUpdateWrapper() - .set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()) - .set(JobEntity::getStartTimestamp, System.currentTimeMillis()) - .eq(JobEntity::getId, job.getId()) - ); - - // add session - EnvironmentSessionEntity session = new EnvironmentSessionEntity(); - session.setEnvId(envId); - session.setJobId(job.getId()); - session.setStatus(1); - session.setUserId("system"); - session.setWorkspaceId(job.getWorkspaceId()); - session.setStartTimestamp(System.currentTimeMillis()); - environmentSessionService.save(session); - - HttpResponse response = requestEnvPlaybook(job, environment); - log.info("[playbookExecutor] [job id: {}] [env: {}] [status: {}]", job.getId(), environment.getId(), response.getStatus()); - if (!response.isOk()) { - - String result = response.body(); - if (log.isDebugEnabled()) { - log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result); - } - - File logFile = T.FileUtil.file(job.getLogPath()); - T.FileUtil.appendString(String.format("ERROR: Request %s environment error \n", environment.getId()), logFile, "UTF-8"); - T.FileUtil.appendString(String.format("Result: %s", result), logFile, "UTF-8"); - - // update job status, starTime, updateTimestamp - jobService.update(new LambdaUpdateWrapper() - .set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue()) - .set(JobEntity::getEndTimestamp, System.currentTimeMillis()) - .eq(JobEntity::getId, job.getId())); - - // remove session - environmentService.removeSession(session.getId()); - log.info("[playbookExecutor] [request env exec playbook error] [job id: {}]", job.getId()); - } + EnvironmentEntity environment = environmentService.getById(envId); + if (!environment.getStatus().equals(1)) { + log.warn("[JobPlaybookExecutor] [playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", nextJob.getId(), environment.getId()); + jobQueueManager.requeueJob(nextJob); // 将任务放回队列 + continue; } - }); + + List sessionList = environmentSessionService.list(new LambdaQueryWrapper() + .eq(EnvironmentSessionEntity::getStatus, "1") + .eq(EnvironmentSessionEntity::getEnvId, envId)); + if (T.CollUtil.isNotEmpty(sessionList)) { + log.warn("[JobPlaybookExecutor] [playbookExecutor] [environment is in used] [jobId: {}] [envId: {}]", nextJob.getId(), environment.getId()); + jobQueueManager.requeueJob(nextJob); // 将任务放回队列 + continue; + } + + // update job status running + jobService.update(new LambdaUpdateWrapper() + .set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()) + .set(JobEntity::getStartTimestamp, System.currentTimeMillis()) + .eq(JobEntity::getId, nextJob.getId()) + ); + + // add session + EnvironmentSessionEntity session = new EnvironmentSessionEntity(); + session.setEnvId(envId); + session.setJobId(nextJob.getId()); + session.setStatus(1); + session.setUserId("system"); + session.setWorkspaceId(nextJob.getWorkspaceId()); + session.setStartTimestamp(System.currentTimeMillis()); + environmentSessionService.save(session); + + // 执行任务 + processJobAsync(nextJob, environment, session); + } } } - private HttpResponse requestEnvPlaybook(JobEntity job, EnvironmentEntity environment) { + private void processJobAsync(JobEntity job, EnvironmentEntity environment, EnvironmentSessionEntity session) { + T.ThreadUtil.execAsync(() -> { + log.info("[JobPlaybookExecutor] [processJobAsync] [start jobId: {}]", job.getId()); + try { + // 执行请求 + HttpResponse response = requestEnvironment(job, environment); + if (!response.isOk()) { + String result = response.body(); + log.warn("[JobPlaybookExecutor] [processJobAsync] [envId: {}] [result: {}]", environment.getId(), result); + + File logFile = T.FileUtil.file(job.getLogPath()); + T.FileUtil.appendString(String.format("ERROR: Request %s environment error! msg: %s.\n", environment.getName(), result), logFile, "UTF-8"); + + // update job status, starTime, updateTimestamp + jobService.update(new LambdaUpdateWrapper() + .set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue()) + .set(JobEntity::getEndTimestamp, System.currentTimeMillis()) + .eq(JobEntity::getId, job.getId())); + + // remove session + environmentService.removeSession(session.getId()); + } + log.info("[JobPlaybookExecutor] [processJobAsync] [Finished jobId: {}]", job.getId()); + } catch (Exception e) { + + // update job status, starTime, updateTimestamp + jobService.update(new LambdaUpdateWrapper() + .set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue()) + .set(JobEntity::getEndTimestamp, System.currentTimeMillis()) + .eq(JobEntity::getId, job.getId())); + + // remove session + environmentService.removeSession(session.getId()); + throw new RuntimeException(e); + } + }); + } + + private HttpResponse requestEnvironment(JobEntity job, EnvironmentEntity environment) { File zipFile = null; try { String playbookId = job.getPlaybookId(); String packageId = job.getPackageId(); + // package playbook file PackageEntity packageEntity = packageService.getById(packageId); File packageFile = T.FileUtil.file(packageEntity.getPath()); + PlaybookEntity playbook = playbookService.getById(playbookId); + File playbookFile = T.FileUtil.file(playbook.getPath()); + + // zip + zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip")); + T.ZipUtil.zip(zipFile, true, packageFile, playbookFile); + + JSONObject paramJSONObject = environment.getParamJSONObject(); + String url = paramJSONObject.getStr("url"); + String token = paramJSONObject.getStr("token"); + + // parameters String packageName = packageEntity.getIdentifier(); String parameters = job.getParameters(); Map params = T.MapUtil.newHashMap(); if (T.StrUtil.isNotEmpty(parameters)) { params = T.JSONUtil.toBean(parameters, Map.class); - }else { + } else { params.put("reInstall", true); params.put("clearCache", true); params.put("unInstall", true); } - PlaybookEntity playbook = playbookService.getById(playbookId); - File playbookFile = T.FileUtil.file(playbook.getPath()); - - log.info("[playbookExecutor] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId); - JSONObject paramJSONObject = environment.getParamJSONObject(); - String url = paramJSONObject.getStr("url"); - String token = paramJSONObject.getStr("token"); - zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip")); + // build request + log.info("[JobPlaybookExecutor] [requestEnvironment] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId); HttpRequest request = T.HttpUtil.createPost(String.format("%s/api/v1/env/playbook", url)); - T.ZipUtil.zip(zipFile, true, packageFile, playbookFile); + request.header("Authorization", token); request.form("file", zipFile); request.form("id", job.getId()); request.form("packageName", packageName); - request.header("Authorization", token); for (Map.Entry param : params.entrySet()) { request.form(param.getKey(), param.getValue()); } HttpResponse response = request.execute(); return response; + } catch (Exception e) { + log.error("[JobPlaybookExecutor] [requestEnvironment] [error] [jobId: {}]", job.getId(), e); + throw new RuntimeException(e); } finally { T.FileUtil.del(zipFile); } diff --git a/src/main/java/net/geedge/asw/module/runner/util/JobQueueManager.java b/src/main/java/net/geedge/asw/module/runner/util/JobQueueManager.java new file mode 100644 index 0000000..a0dd96c --- /dev/null +++ b/src/main/java/net/geedge/asw/module/runner/util/JobQueueManager.java @@ -0,0 +1,61 @@ +package net.geedge.asw.module.runner.util; + +import net.geedge.asw.common.util.T; +import net.geedge.asw.module.runner.entity.JobEntity; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * job 任务队列管理 + */ + +@Component +public class JobQueueManager { + + // 每个环境对应一个任务队列,以创建时间排序 + private final Map> groupedQueues = new ConcurrentHashMap<>(); + + // 添加任务到队列 + public synchronized void addJob(JobEntity job) { + boolean contains = groupedQueues.containsKey(job.getEnvId()); + if (!contains) { + PriorityQueue queue = new PriorityQueue<>(Comparator.comparing(JobEntity::getCreateTimestamp)); + queue.offer(job); + groupedQueues.put(job.getEnvId(), queue); + } else { + groupedQueues.get(job.getEnvId()).offer(job); + } + } + + // 获取每个环境下的任务 + public synchronized List fetchNextJob() { + List list = T.ListUtil.list(false); + for (Map.Entry> queueEntry : groupedQueues.entrySet()) { + PriorityQueue queue = queueEntry.getValue(); + if (queue == null || queue.isEmpty()) { + continue; // 如果该环境下没有任务则跳过 + } + list.add(queue.poll()); + } + return list; + } + + // 检查所有队列是否为空 + public synchronized boolean isAllQueuesEmpty() { + return groupedQueues.values().stream().allMatch(PriorityQueue::isEmpty); + } + + // 重新添加队列 + public synchronized void requeueJob(JobEntity job) { + PriorityQueue queue = groupedQueues.get(job.getEnvId()); + queue.offer(job); + } + + // 删除 + public synchronized void removeJob(JobEntity job) { + PriorityQueue queue = groupedQueues.get(job.getEnvId()); + queue.remove(job); + } +}