Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions core/src/main/java/io/kestra/core/runners/RunVariables.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
Expand Down Expand Up @@ -282,15 +283,15 @@ public Map<String, Object> build(final RunContextLogger logger, final PropertyCo

if (flow != null && flow.getInputs() != null) {
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
}
});
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
// Silent catch, if an input depends on another input, or a variable that is populated at runtime / input filling time, we can't resolve it here.
}
});
}

if (!inputs.isEmpty()) {
Expand Down
34 changes: 33 additions & 1 deletion core/src/test/java/io/kestra/core/runners/RunVariablesTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
package io.kestra.core.runners;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.BoolInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.runners.pebble.functions.SecretFunction;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -112,4 +123,25 @@ void shouldGetKestraConfiguration() {
assertThat(kestra.get("environment")).isEqualTo("test");
assertThat(kestra.get("url")).isEqualTo("http://localhost:8080");
}
}

@Test
void nonResolvableDynamicInputsShouldBeSkipped() throws IllegalVariableEvaluationException {
Map<String, Object> variables = new RunVariables.DefaultBuilder()
.withFlow(Flow
.builder()
.namespace("a.b")
.id("c")
.inputs(List.of(
BoolInput.builder().id("a").type(Type.BOOL).defaults(Property.ofValue(true)).build(),
BoolInput.builder().id("b").type(Type.BOOL).dependsOn(new DependsOn(List.of("a"), null)).defaults(Property.ofExpression("{{inputs.a == true}}")).build()
))
.build()
)
.withExecution(Execution.builder().id(IdUtils.create()).build())
.build(new RunContextLogger(), PropertyContext.create(new VariableRenderer(Mockito.mock(ApplicationContext.class), Mockito.mock(VariableRenderer.VariableConfiguration.class), Collections.emptyList())));

Assertions.assertEquals(Map.of(
"a", true
), variables.get("inputs"));
}
}
69 changes: 69 additions & 0 deletions core/src/test/java/io/kestra/core/topologies/FlowTopologyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,75 @@ void should_findDependencies_cyclicTriggers() throws FlowProcessingException {

}

@Test
void flowTriggerWithTargetFlow() throws FlowProcessingException {
// Given
var tenantId = randomTenantId();
var parent = flowService.importFlow(tenantId,
"""
id: parent
namespace: io.kestra.unittest
inputs:
- id: a
type: BOOL
defaults: true

- id: b
type: BOOL
defaults: "{{ inputs.a == true }}"
dependsOn:
inputs:
- a
tasks:
- id: helloA
type: io.kestra.plugin.core.log.Log
message: Hello A
""");
var child = flowService.importFlow(tenantId, """
id: child
namespace: io.kestra.unittest
tasks:
- id: helloB
type: io.kestra.plugin.core.log.Log
message: Hello B
triggers:
- id: release
type: io.kestra.plugin.core.trigger.Flow
states:
- SUCCESS
preconditions:
id: flows
flows:
- namespace: io.kestra.unittest
flowId: parent
""");
var unrelatedFlow = flowService.importFlow(tenantId, """
id: unrelated_flow
namespace: io.kestra.unittest
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
""");

// When
computeAndSaveTopologies(List.of(child, parent, unrelatedFlow));
System.out.println();
flowTopologyRepository.findAll(tenantId).forEach(topology -> {
System.out.println(FlowTopologyTestData.of(topology));
});

var dependencies = flowService.findDependencies(tenantId, "io.kestra.unittest", parent.getId(), false, true);
flowTopologyRepository.findAll(tenantId).forEach(topology -> {
System.out.println(FlowTopologyTestData.of(topology));
});

// Then
assertThat(dependencies.map(FlowTopologyTestData::of))
.containsExactlyInAnyOrder(
new FlowTopologyTestData(parent, child)
);
}

/**
* this function mimics the production behaviour
*/
Expand Down
Loading