Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion backend/src/crd/kubernetes/v2beta1/pipelineversion_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ func (p *PipelineVersion) ToModel() (*model.PipelineVersion, error) {
piplineSpecAndPlatformSpec = append(piplineSpecAndPlatformSpec, platformSpecBytes...)
}

// The pipeline spec in model.PipelineVersion ignores platform specs that don't have a "kubernetes" platform.
// This additional parsing filters out platform specs that normally are excluded when the pipeline version is
// created through the REST API. This is done rather than modifying the mutating webhook to remove these
// platform specs so that GitOps tools don't see a diff from what is in Git and what is on the cluster.
v2Spec, err := template.NewV2SpecTemplate(piplineSpecAndPlatformSpec, false, nil)
if err != nil {
return nil, fmt.Errorf("failed to parse the pipeline spec: %w", err)
}

pipelineVersionStatus := model.PipelineVersionCreating

for _, condition := range p.Status.Conditions {
Expand Down Expand Up @@ -214,7 +223,7 @@ func (p *PipelineVersion) ToModel() (*model.PipelineVersion, error) {
Status: pipelineVersionStatus,
CodeSourceUrl: p.Spec.CodeSourceURL,
Description: p.Spec.Description,
PipelineSpec: string(piplineSpecAndPlatformSpec),
PipelineSpec: string(v2Spec.Bytes()),
PipelineSpecURI: p.Spec.PipelineSpecURI,
}, nil
}
Expand Down
81 changes: 81 additions & 0 deletions backend/src/crd/kubernetes/v2beta1/pipelineversion_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,67 @@ func TestToModel_PipelineAndPlatformSpecs(t *testing.T) {
assert.Contains(t, result.PipelineSpec, "---")
}

func TestToModel_PipelineAndPlatformSpecs_WithNonKubernetesPlatform(t *testing.T) {
pipelineVersion := &PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "test-version",
Namespace: "default",
UID: "version-456",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: GroupVersion.String(),
Kind: "Pipeline",
UID: "pipeline-123",
Name: "test-pipeline",
},
},
},
Spec: PipelineVersionSpec{
DisplayName: "Test Version",
Description: "A test version",
PipelineName: "test-pipeline",
CodeSourceURL: "https://github.com/test/pipeline",
PipelineSpecURI: "gs://bucket/pipeline.yaml",
PipelineSpec: IRSpec{
Value: map[string]interface{}{
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline",
"displayName": "Test Pipeline",
},
"root": map[string]interface{}{
"dag": map[string]interface{}{
"tasks": map[string]interface{}{},
},
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-2.13.0",
},
},
PlatformSpec: &IRSpec{
Value: map[string]interface{}{
"platforms": map[string]interface{}{
"not-kubernetes": map[string]interface{}{
"pipelineConfig": map[string]interface{}{
"workspace": map[string]interface{}{
"size": "10Gi",
},
},
},
},
},
},
},
}

result, err := pipelineVersion.ToModel()
require.NoError(t, err)
assert.NotNil(t, result)

// Check that the platform spec is not included
assert.NotContains(t, result.PipelineSpec, "platforms:")
assert.NotContains(t, result.PipelineSpec, "---")
}

func TestToModel_WithStatus(t *testing.T) {
pipelineVersion := &PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -426,6 +487,11 @@ func TestToModel_WithStatus(t *testing.T) {
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline",
},
"root": map[string]interface{}{
"dag": map[string]interface{}{
"tasks": map[string]interface{}{},
},
},
"schemaVersion": "2.1.0",
},
},
Expand Down Expand Up @@ -471,6 +537,11 @@ func TestToModel_DisplayNameFallback(t *testing.T) {
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline",
},
"root": map[string]interface{}{
"dag": map[string]interface{}{
"tasks": map[string]interface{}{},
},
},
"schemaVersion": "2.1.0",
},
},
Expand Down Expand Up @@ -498,6 +569,11 @@ func TestToModel_InvalidPipelineSpec(t *testing.T) {
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline",
},
"root": map[string]interface{}{
"dag": map[string]interface{}{
"tasks": map[string]interface{}{},
},
},
"schemaVersion": "2.1.0",
},
},
Expand Down Expand Up @@ -526,6 +602,11 @@ func TestToModel_InvalidPlatformSpec(t *testing.T) {
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline",
},
"root": map[string]interface{}{
"dag": map[string]interface{}{
"tasks": map[string]interface{}{},
},
},
"schemaVersion": "2.1.0",
},
},
Expand Down
27 changes: 26 additions & 1 deletion sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,8 +1100,18 @@ def my_pipeline(input1: str):
pipeline_name)
self.assertEqual(pipeline_version_manifest['metadata']['namespace'],
namespace)
self.assertNotIn('platformSpec', pipeline_version_manifest['spec'])

# Test with include_pipeline_manifest=False and has a platform spec
@dsl.pipeline(
name='my-pipeline',
description='A simple test pipeline with platform spec',
pipeline_config=dsl.PipelineConfig(
workspace=dsl.WorkspaceConfig(size='25Gi'),),
)
def my_pipeline(input1: str):
print_op(message=input1)

# Test with include_pipeline_manifest=False
package_path2 = os.path.join(tmpdir, 'pipeline2.yaml')
kubernetes_manifest_options2 = KubernetesManifestOptions(
pipeline_name=pipeline_name,
Expand Down Expand Up @@ -1136,6 +1146,21 @@ def my_pipeline(input1: str):
pipeline_name)
self.assertEqual(
pipeline_version_manifest2['metadata']['namespace'], namespace)
self.assertEqual(
pipeline_version_manifest2['spec']['platformSpec'], {
'platforms': {
'kubernetes': {
'pipelineConfig': {
'workspace': {
'kubernetes': {
'pvcSpecPatch': {}
},
'size': '25Gi'
}
}
}
}
})


class TestCompilePipelineCaching(unittest.TestCase):
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2121,9 +2121,12 @@ def _write_kubernetes_manifest_to_file(
'displayName': pipeline_version_display_name,
'pipelineName': pipeline_name,
'pipelineSpec': pipeline_spec_dict,
'platformSpec': platform_spec_dict,
},
}

if platform_spec_dict:
pipeline_version_manifest['spec']['platformSpec'] = platform_spec_dict

if pipeline_description:
pipeline_version_manifest['spec']['description'] = pipeline_description
documents.append(pipeline_version_manifest)
Expand Down
Loading