Skip to content

Commit 4a5b916

Browse files
fix(flows): avoid failing flow dependencies with dynamic defaults (#11166)
closes #11117
1 parent f7b2af1 commit 4a5b916

File tree

3 files changed

+112
-10
lines changed

3 files changed

+112
-10
lines changed

core/src/main/java/io/kestra/core/runners/RunVariables.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.kestra.core.models.flows.Input;
1111
import io.kestra.core.models.flows.State;
1212
import io.kestra.core.models.flows.input.SecretInput;
13+
import io.kestra.core.models.property.Property;
1314
import io.kestra.core.models.property.PropertyContext;
1415
import io.kestra.core.models.tasks.Task;
1516
import io.kestra.core.models.triggers.AbstractTrigger;
@@ -282,15 +283,15 @@ public Map<String, Object> build(final RunContextLogger logger, final PropertyCo
282283

283284
if (flow != null && flow.getInputs() != null) {
284285
// we add default inputs value from the flow if not already set, this will be useful for triggers
285-
flow.getInputs().stream()
286-
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
287-
.forEach(input -> {
288-
try {
289-
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
290-
} catch (IllegalVariableEvaluationException e) {
291-
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
292-
}
293-
});
286+
flow.getInputs().stream()
287+
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
288+
.forEach(input -> {
289+
try {
290+
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
291+
} catch (IllegalVariableEvaluationException e) {
292+
// Silent catch, if an input depends on another input, or a variable that is populated at runtime / input filling time, we can't resolve it here.
293+
}
294+
});
294295
}
295296

296297
if (!inputs.isEmpty()) {

core/src/test/java/io/kestra/core/runners/RunVariablesTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
package io.kestra.core.runners;
22

3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.models.executions.Execution;
5+
import io.kestra.core.models.flows.DependsOn;
36
import io.kestra.core.models.flows.Flow;
7+
import io.kestra.core.models.flows.Type;
8+
import io.kestra.core.models.flows.input.BoolInput;
9+
import io.kestra.core.models.property.Property;
410
import io.kestra.core.models.property.PropertyContext;
511
import io.kestra.core.models.tasks.Task;
612
import io.kestra.core.models.triggers.AbstractTrigger;
13+
import io.kestra.core.runners.pebble.functions.SecretFunction;
14+
import io.kestra.core.utils.IdUtils;
15+
import io.micronaut.context.ApplicationContext;
716
import org.junit.jupiter.api.Assertions;
817
import org.junit.jupiter.api.Test;
918
import org.mockito.Mockito;
1019

20+
import java.util.Collections;
21+
import java.util.List;
1122
import java.util.Map;
1223

1324
import static org.assertj.core.api.Assertions.assertThat;
@@ -112,4 +123,25 @@ void shouldGetKestraConfiguration() {
112123
assertThat(kestra.get("environment")).isEqualTo("test");
113124
assertThat(kestra.get("url")).isEqualTo("http://localhost:8080");
114125
}
115-
}
126+
127+
@Test
128+
void nonResolvableDynamicInputsShouldBeSkipped() throws IllegalVariableEvaluationException {
129+
Map<String, Object> variables = new RunVariables.DefaultBuilder()
130+
.withFlow(Flow
131+
.builder()
132+
.namespace("a.b")
133+
.id("c")
134+
.inputs(List.of(
135+
BoolInput.builder().id("a").type(Type.BOOL).defaults(Property.ofValue(true)).build(),
136+
BoolInput.builder().id("b").type(Type.BOOL).dependsOn(new DependsOn(List.of("a"), null)).defaults(Property.ofExpression("{{inputs.a == true}}")).build()
137+
))
138+
.build()
139+
)
140+
.withExecution(Execution.builder().id(IdUtils.create()).build())
141+
.build(new RunContextLogger(), PropertyContext.create(new VariableRenderer(Mockito.mock(ApplicationContext.class), Mockito.mock(VariableRenderer.VariableConfiguration.class), Collections.emptyList())));
142+
143+
Assertions.assertEquals(Map.of(
144+
"a", true
145+
), variables.get("inputs"));
146+
}
147+
}

core/src/test/java/io/kestra/core/topologies/FlowTopologyTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,75 @@ void should_findDependencies_cyclicTriggers() throws FlowProcessingException {
199199

200200
}
201201

202+
@Test
203+
void flowTriggerWithTargetFlow() throws FlowProcessingException {
204+
// Given
205+
var tenantId = randomTenantId();
206+
var parent = flowService.importFlow(tenantId,
207+
"""
208+
id: parent
209+
namespace: io.kestra.unittest
210+
inputs:
211+
- id: a
212+
type: BOOL
213+
defaults: true
214+
215+
- id: b
216+
type: BOOL
217+
defaults: "{{ inputs.a == true }}"
218+
dependsOn:
219+
inputs:
220+
- a
221+
tasks:
222+
- id: helloA
223+
type: io.kestra.plugin.core.log.Log
224+
message: Hello A
225+
""");
226+
var child = flowService.importFlow(tenantId, """
227+
id: child
228+
namespace: io.kestra.unittest
229+
tasks:
230+
- id: helloB
231+
type: io.kestra.plugin.core.log.Log
232+
message: Hello B
233+
triggers:
234+
- id: release
235+
type: io.kestra.plugin.core.trigger.Flow
236+
states:
237+
- SUCCESS
238+
preconditions:
239+
id: flows
240+
flows:
241+
- namespace: io.kestra.unittest
242+
flowId: parent
243+
""");
244+
var unrelatedFlow = flowService.importFlow(tenantId, """
245+
id: unrelated_flow
246+
namespace: io.kestra.unittest
247+
tasks:
248+
- id: download
249+
type: io.kestra.plugin.core.http.Download
250+
""");
251+
252+
// When
253+
computeAndSaveTopologies(List.of(child, parent, unrelatedFlow));
254+
System.out.println();
255+
flowTopologyRepository.findAll(tenantId).forEach(topology -> {
256+
System.out.println(FlowTopologyTestData.of(topology));
257+
});
258+
259+
var dependencies = flowService.findDependencies(tenantId, "io.kestra.unittest", parent.getId(), false, true);
260+
flowTopologyRepository.findAll(tenantId).forEach(topology -> {
261+
System.out.println(FlowTopologyTestData.of(topology));
262+
});
263+
264+
// Then
265+
assertThat(dependencies.map(FlowTopologyTestData::of))
266+
.containsExactlyInAnyOrder(
267+
new FlowTopologyTestData(parent, child)
268+
);
269+
}
270+
202271
/**
203272
* this function mimics the production behaviour
204273
*/

0 commit comments

Comments
 (0)