Merge remote-tracking branch 'origin/haskafka' into haskafka
This commit is contained in:
@@ -12,6 +12,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.reactive.function.client.*;
|
||||
import reactor.core.publisher.Mono;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
@@ -130,12 +131,8 @@ public class StateHandler {
|
||||
private Boolean handleDynamicTaskStart(TaskService taskService, Task task) {
|
||||
// 将所有关联的动态规则审批状态修改为“已使用”
|
||||
taskService.updateDynamicRuleAuditStatusInTask(task.getTaskId(), AuditStatusEnum.USING);
|
||||
try{
|
||||
return sendFilters(taskService, task);
|
||||
} catch (Exception e) {
|
||||
log.error("动态任务筛选条件发送出错", e);
|
||||
return true;
|
||||
}
|
||||
return sendFilters(taskService, task);
|
||||
|
||||
// return true;
|
||||
}
|
||||
|
||||
@@ -152,8 +149,14 @@ public class StateHandler {
|
||||
|
||||
List<String> commandUUIDs= commandService.createCommands(staticTaskCommandInfos);
|
||||
// 将command新建信号发送到c3下发程序
|
||||
sendCommandDistributeSignal(commandUUIDs);
|
||||
return true;
|
||||
try {
|
||||
sendCommandDistributeSignal(commandUUIDs);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error(String.format("静态任务%d 首次指令下发c3出错",task.getTaskId()));
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
public Boolean sendCommandDistributeSignal(List<String> commandUUIDs) {
|
||||
|
||||
@@ -168,7 +171,7 @@ public class StateHandler {
|
||||
Mono<Map> mono = client_commandDistribute.post()
|
||||
.uri("/rule")
|
||||
.bodyValue(commandIDMaps)
|
||||
.accept(MediaType.APPLICATION_JSON) // 设置Accept头为application/json
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.exchangeToMono(res -> {
|
||||
if (res.statusCode().equals(HttpStatus.OK)) {
|
||||
return res.bodyToMono(Map.class);
|
||||
@@ -178,14 +181,15 @@ public class StateHandler {
|
||||
.doOnError(WebClientResponseException.class, res -> success.set(false));
|
||||
|
||||
|
||||
Map<String, String> response = mono.block(Duration.ofSeconds(5));
|
||||
Map<String, Integer> response = mono.block(Duration.ofSeconds(5));
|
||||
|
||||
if (response == null) {
|
||||
return false;
|
||||
}
|
||||
response.forEach((commandUUID, responseCode) -> {
|
||||
if (!responseCode.equals("0")) {
|
||||
log.error("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
|
||||
log.info("指令首次下发成功, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
|
||||
if (responseCode != 0) {
|
||||
log.warn("指令首次下发失败, 指令uuid: " + commandUUID + ", responseCode: " + responseCode);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user