From b843af4544fde85816b74c5337ef270607c41599 Mon Sep 17 00:00:00 2001 From: Anoop Panicker <34087882+apanicker-nflx@users.noreply.github.com> Date: Sat, 10 Sep 2022 00:44:28 -0700 Subject: [PATCH] fix subworkflow output during repair (#3232) --- .../conductor/core/execution/tasks/SubWorkflow.java | 2 +- .../core/reconciliation/WorkflowRepairService.java | 12 +++++++++--- .../reconciliation/TestWorkflowRepairService.java | 9 +++++++++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java index d58b3c0050..29273291dc 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java @@ -205,7 +205,7 @@ private void updateTaskStatus(WorkflowModel subworkflow, TaskModel task) { task.setExternalOutputPayloadStoragePath( subworkflow.getExternalOutputPayloadStoragePath()); } else { - task.getOutputData().putAll(subworkflow.getOutput()); + task.addOutput(subworkflow.getOutput()); } if (!status.isSuccessful()) { task.setReasonForIncompletion( diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java index cf6f9a055c..f24aa9b1f6 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java @@ -143,7 +143,12 @@ boolean verifyAndRepairTask(TaskModel task) { && task.getStatus() == TaskModel.Status.IN_PROGRESS) { WorkflowModel subWorkflow = executionDAO.getWorkflow(task.getSubWorkflowId(), false); if (subWorkflow.getStatus().isTerminal()) { - repairSubWorkflowTask(task, subWorkflow.getStatus()); + LOGGER.info( + "Repairing sub workflow task {} for sub workflow {} in workflow {}", + task.getTaskId(), + task.getSubWorkflowId(), + task.getWorkflowInstanceId()); + repairSubWorkflowTask(task, subWorkflow); return true; } } @@ -165,8 +170,8 @@ private boolean verifyAndRepairWorkflow(String workflowId) { return false; } - private void repairSubWorkflowTask(TaskModel task, WorkflowModel.Status subWorkflowStatus) { - switch (subWorkflowStatus) { + private void repairSubWorkflowTask(TaskModel task, WorkflowModel subWorkflow) { + switch (subWorkflow.getStatus()) { case COMPLETED: task.setStatus(TaskModel.Status.COMPLETED); break; @@ -180,6 +185,7 @@ private void repairSubWorkflowTask(TaskModel task, WorkflowModel.Status subWorkf task.setStatus(TaskModel.Status.TIMED_OUT); break; } + task.addOutput(subWorkflow.getOutput()); executionDAO.updateTask(task); } } diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowRepairService.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowRepairService.java index ef0e6be9c4..15b07bb919 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowRepairService.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowRepairService.java @@ -13,6 +13,7 @@ package com.netflix.conductor.core.reconciliation; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import org.junit.Before; @@ -239,10 +240,14 @@ public void assertInProgressSubWorkflowSystemTasksAreCheckedAndRepaired() { task.setTaskId(taskId); task.setCallbackAfterSeconds(60); task.setSubWorkflowId(subWorkflowId); + Map outputMap = new HashMap<>(); + outputMap.put("subWorkflowId", subWorkflowId); + task.setOutputData(outputMap); WorkflowModel subWorkflow = new WorkflowModel(); subWorkflow.setWorkflowId(subWorkflowId); subWorkflow.setStatus(WorkflowModel.Status.TERMINATED); + subWorkflow.setOutput(Map.of("k1", "v1", "k2", "v2")); when(executionDAO.getWorkflow(subWorkflowId, false)).thenReturn(subWorkflow); @@ -256,5 +261,9 @@ public void assertInProgressSubWorkflowSystemTasksAreCheckedAndRepaired() { assertEquals(taskId, argumentCaptor.getValue().getTaskId()); assertEquals(subWorkflowId, argumentCaptor.getValue().getSubWorkflowId()); assertEquals(TaskModel.Status.CANCELED, argumentCaptor.getValue().getStatus()); + assertNotNull(argumentCaptor.getValue().getOutputData()); + assertEquals(subWorkflowId, argumentCaptor.getValue().getOutputData().get("subWorkflowId")); + assertEquals("v1", argumentCaptor.getValue().getOutputData().get("k1")); + assertEquals("v2", argumentCaptor.getValue().getOutputData().get("k2")); } }