Skip to content

Commit c03127d

Browse files
mprahlrimolive
andauthored
feat(backend): Add the Kubernetes native pipeline store (#11881)
* Add the Kubernetes native pipeline store This also improves cache update race conditions in the webhooks. Co-authored-by: Matt Prahl <[email protected]> Signed-off-by: Ricardo M. Oliveira <[email protected]> * Use controller-runtime for the non-caching client Signed-off-by: mprahl <[email protected]> * Put the Kubernetes native CI manifests under the Argo manifests Signed-off-by: mprahl <[email protected]> * Fix the flaky Kubernetes pipeline store tests Some tests set the viper configuration of POD_NAMESPACE while others didn't and so the order of the tests mattered. This now sets and resets the viper configuration for each test. Signed-off-by: mprahl <[email protected]> * Modify the suggested pipeline version name to be a valid K8s name This is more important for the Kubernetes pipeline store. Signed-off-by: mprahl <[email protected]> * Fix the ml-pipeline Service ports in K8s native mode The KFP UI automatically uses the first port listed in the ml-pipeline Service to communicate with the KFP API. Using a JSON patch to add the webhook port ensures it doesn't change the order. Signed-off-by: mprahl <[email protected]> --------- Signed-off-by: Ricardo M. Oliveira <[email protected]> Signed-off-by: mprahl <[email protected]> Co-authored-by: Ricardo M. Oliveira <[email protected]>
1 parent d90e4e8 commit c03127d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1693
-231
lines changed

.github/actions/kfp-cluster/action.yml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ inputs:
55
k8s_version:
66
description: "The Kubernetes version to use for the Kind cluster"
77
required: true
8+
pipeline_store:
9+
description: "Flag to deploy KFP with K8s Native API"
10+
default: 'database'
11+
required: false
812
proxy:
913
description: "If KFP should be deployed with proxy configuration"
1014
required: false
@@ -39,8 +43,12 @@ runs:
3943
- name: Deploy KFP
4044
shell: bash
4145
run: |
46+
ARGS=""
47+
4248
if [ "${{ inputs.proxy }}" = "true" ]; then
43-
./.github/resources/scripts/deploy-kfp.sh --proxy
44-
else
45-
./.github/resources/scripts/deploy-kfp.sh
49+
ARGS="${ARGS} --proxy"
50+
elif [ "${{inputs.pipeline_store }}" = "kubernetes" ]; then
51+
ARGS="${ARGS} --deploy-k8s-native"
4652
fi
53+
54+
./.github/resources/scripts/deploy-kfp.sh $ARGS
File renamed without changes.
File renamed without changes.

.github/resources/manifests/kubernetes-native/overlays/proxy/kustomization.yaml

Lines changed: 0 additions & 11 deletions
This file was deleted.

.github/resources/manifests/kubernetes-native/overlays/proxy/proxy-env.yaml

Lines changed: 0 additions & 16 deletions
This file was deleted.

.github/resources/scripts/deploy-kfp.sh

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,43 +24,55 @@ C_DIR="${BASH_SOURCE%/*}"
2424
if [[ ! -d "$C_DIR" ]]; then C_DIR="$PWD"; fi
2525
source "${C_DIR}/helper-functions.sh"
2626

27+
TEST_MANIFESTS=".github/resources/manifests/argo"
28+
PIPELINES_STORE="database"
2729
USE_PROXY=false
2830

29-
while getopts ":p-:" OPT; do
30-
case $OPT in
31-
-) [ "$OPTARG" = "proxy" ] && USE_PROXY=true || { echo "Unknown option --$OPTARG"; exit 1; };;
32-
\?) echo "Invalid option: -$OPTARG" >&2; exit 1;;
33-
esac
31+
# Loop over script arguments passed. This uses a single switch-case
32+
# block with default value in case we want to make alternative deployments
33+
# in the future.
34+
while [ "$#" -gt 0 ]; do
35+
case "$1" in
36+
--deploy-k8s-native)
37+
PIPELINES_STORE="kubernetes"
38+
shift
39+
;;
40+
--proxy)
41+
USE_PROXY=true
42+
shift
43+
;;
44+
esac
3445
done
3546

36-
shift $((OPTIND-1))
47+
if [ "${USE_PROXY}" == "true" && "${PIPELINES_STORE}" == "kubernetes" ]; then
48+
echo "ERROR: Kubernetes Pipeline store cannot be deployed with proxy support."
49+
exit 1
50+
fi
3751

3852
kubectl apply -k "manifests/kustomize/cluster-scoped-resources/"
39-
kubectl apply -k "manifests/kustomize/base/crds"
4053
kubectl wait crd/applications.app.k8s.io --for condition=established --timeout=60s || EXIT_CODE=$?
4154
if [[ $EXIT_CODE -ne 0 ]]
4255
then
4356
echo "Failed to deploy cluster-scoped resources."
4457
exit $EXIT_CODE
4558
fi
4659

47-
#Install cert-manager
48-
make -C ./backend install-cert-manager || EXIT_CODE=$?
49-
if [[ $EXIT_CODE -ne 0 ]]
50-
then
51-
echo "Failed to deploy cert-manager."
52-
exit $EXIT_CODE
53-
fi
54-
55-
# Deploy manifest
56-
TEST_MANIFESTS=".github/resources/manifests/argo"
57-
58-
if [[ "$PIPELINE_STORE" == "kubernetes" ]]; then
59-
TEST_MANIFESTS=".github/resources/manifests/kubernetes-native"
60+
# If pipelines store is set to 'kubernetes', cert-manager must be deployed
61+
if [ "${PIPELINES_STORE}" == "kubernetes" ]; then
62+
#Install cert-manager
63+
make -C ./backend install-cert-manager || EXIT_CODE=$?
64+
if [[ $EXIT_CODE -ne 0 ]]
65+
then
66+
echo "Failed to deploy cert-manager."
67+
exit $EXIT_CODE
68+
fi
6069
fi
6170

71+
# Manifests will be deployed according to the flag provided
6272
if $USE_PROXY; then
6373
TEST_MANIFESTS="${TEST_MANIFESTS}/overlays/proxy"
74+
elif [ "${PIPELINES_STORE}" == "kubernetes" ]; then
75+
TEST_MANIFESTS="${TEST_MANIFESTS}/overlays/kubernetes-native"
6476
else
6577
TEST_MANIFESTS="${TEST_MANIFESTS}/overlays/no-proxy"
6678
fi

.github/workflows/e2e-test.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,9 @@ jobs:
152152
runs-on: ubuntu-latest
153153
strategy:
154154
matrix:
155+
pipeline_store: [ "database", "kubernetes" ]
155156
k8s_version: [ "v1.29.2", "v1.30.2", "v1.31.0" ]
156-
name: API integration tests v2 - K8s ${{ matrix.k8s_version }}
157+
name: API integration tests v2 - K8s with ${{ matrix.pipeline_store }} ${{ matrix.k8s_version }}
157158
steps:
158159
- name: Checkout code
159160
uses: actions/checkout@v4
@@ -167,6 +168,7 @@ jobs:
167168
uses: ./.github/actions/kfp-cluster
168169
with:
169170
k8s_version: ${{ matrix.k8s_version }}
171+
pipeline_store: ${{ matrix.pipeline_store }}
170172

171173
- name: Forward API port
172174
run: ./.github/resources/scripts/forward-port.sh "kubeflow" "ml-pipeline" 8888 8888
@@ -187,7 +189,7 @@ jobs:
187189
if: always()
188190
uses: actions/upload-artifact@v4
189191
with:
190-
name: kfp-api-integration-tests-v2-artifacts-k8s-${{ matrix.k8s_version }}
192+
name: kfp-api-integration-tests-v2-artifacts-k8s-${{ matrix.k8s_version }}-${{ matrix.pipeline_store }}
191193
path: /tmp/tmp*/*
192194

193195
api-integration-tests-v2-with-proxy:

.github/workflows/kfp-webhooks.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ jobs:
2929
uses: ./.github/actions/kfp-cluster
3030
with:
3131
k8s_version: ${{ matrix.k8s_version }}
32-
env:
33-
PIPELINE_STORE: kubernetes
32+
pipeline_store: kubernetes
3433
continue-on-error: true
3534

3635
- name: Run Webhook Integration Tests

.github/workflows/validate-generated-files.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ on:
1515
- 'api/**/*.go'
1616
- 'kubernetes_platform/**/*.proto'
1717
- 'kubernetes_platform/**/*.go'
18+
- 'backend/src/crd/kubernetes/**/*.go'
19+
- 'manifests/kustomize/base/crds/*.yaml'
1820
- '!**/*.md'
1921
- '!**/OWNERS'
2022

@@ -49,7 +51,10 @@ jobs:
4951
- name: Generate kfp-kubernetes proto files from source
5052
working-directory: ./kubernetes_platform
5153
run: make clean all
52-
54+
55+
- name: Generate K8s Native API CRDs
56+
working-directory: ./backend/src/crd/kubernetes
57+
run: make generate manifests
58+
5359
- name: Check for Changes
5460
run: make check-diff
55-

backend/src/apiserver/client_manager/client_manager.go

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
package clientmanager
1616

1717
import (
18+
"context"
1819
"database/sql"
1920
"fmt"
2021
"os"
2122
"strings"
23+
"sync"
2224
"time"
2325

2426
"github.com/cenkalti/backoff"
@@ -36,6 +38,7 @@ import (
3638
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
3739
"github.com/minio/minio-go/v6"
3840
"k8s.io/apimachinery/pkg/runtime"
41+
"sigs.k8s.io/controller-runtime/pkg/cache"
3942
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
4043
)
4144

@@ -108,12 +111,28 @@ type ClientManager struct {
108111
uuid util.UUIDGeneratorInterface
109112
authenticators []auth.Authenticator
110113
controllerClient ctrlclient.Client
114+
controllerClientNoCache ctrlclient.Client
115+
}
116+
117+
// Options to pass to Client Manager initialization
118+
type Options struct {
119+
UsePipelineKubernetesStorage bool
120+
Context context.Context
121+
WaitGroup *sync.WaitGroup
111122
}
112123

113124
func (c *ClientManager) TaskStore() storage.TaskStoreInterface {
114125
return c.taskStore
115126
}
116127

128+
func (c *ClientManager) ControllerClient(cacheEnabled bool) ctrlclient.Client {
129+
if cacheEnabled {
130+
return c.controllerClient
131+
}
132+
133+
return c.controllerClientNoCache
134+
}
135+
117136
func (c *ClientManager) ExperimentStore() storage.ExperimentStoreInterface {
118137
return c.experimentStore
119138
}
@@ -182,48 +201,86 @@ func (c *ClientManager) Authenticators() []auth.Authenticator {
182201
return c.authenticators
183202
}
184203

185-
func (c *ClientManager) ControllerClient() ctrlclient.Client {
186-
return c.controllerClient
187-
}
204+
func (c *ClientManager) init(options *Options) error {
205+
// time
206+
c.time = util.NewRealTime()
188207

189-
func (c *ClientManager) init() error {
190-
glog.Info("Initializing controller client...")
208+
// UUID generator
209+
c.uuid = util.NewUUIDGenerator()
191210

192-
restConfig, err := util.GetKubernetesConfig()
193-
if err != nil {
194-
return fmt.Errorf("failed to initialize the RestConfig: %w", err)
195-
}
211+
var pipelineStoreForRef storage.PipelineStoreInterface
196212

197-
controllerClient, err := ctrlclient.New(
198-
restConfig, ctrlclient.Options{Scheme: scheme},
199-
)
200-
if err != nil {
201-
return fmt.Errorf("failed to initialize the controller client: %w", err)
202-
}
203-
c.controllerClient = controllerClient
204-
glog.Info("Controller client initialized successfully.")
213+
if options.UsePipelineKubernetesStorage || common.IsOnlyKubernetesWebhookMode() {
214+
glog.Info("Initializing controller client...")
215+
restConfig, err := util.GetKubernetesConfig()
216+
if err != nil {
217+
return err
218+
}
205219

206-
if common.IsOnlyKubernetesWebhookMode() {
207-
return nil
220+
var cacheConfig map[string]cache.Config
221+
222+
if !common.IsMultiUserMode() && common.GetPodNamespace() != "" && !common.IsOnlyKubernetesWebhookMode() {
223+
cacheConfig = map[string]cache.Config{common.GetPodNamespace(): {}}
224+
}
225+
226+
k8sAPICache, err := cache.New(restConfig,
227+
cache.Options{
228+
DefaultNamespaces: cacheConfig,
229+
Scheme: scheme,
230+
},
231+
)
232+
if err != nil {
233+
return err
234+
}
235+
236+
options.WaitGroup.Add(1)
237+
go func() {
238+
defer options.WaitGroup.Done()
239+
240+
err := k8sAPICache.Start(options.Context)
241+
if err != nil {
242+
panic(fmt.Sprintf("Failed to start the cache to the cluster: %v", err))
243+
}
244+
}()
245+
246+
controllerClient, err := ctrlclient.New(
247+
restConfig, ctrlclient.Options{Scheme: scheme, Cache: &ctrlclient.CacheOptions{Reader: k8sAPICache}},
248+
)
249+
if err != nil {
250+
return fmt.Errorf("failed to initialize the controller client: %w", err)
251+
}
252+
253+
controllerClientNoCache, err := ctrlclient.New(restConfig, ctrlclient.Options{Scheme: scheme})
254+
if err != nil {
255+
return fmt.Errorf("failed to initialize the no cache controller client: %w", err)
256+
}
257+
258+
glog.Info("Controller client initialized successfully.")
259+
260+
c.controllerClient = controllerClient
261+
c.controllerClientNoCache = controllerClientNoCache
262+
if common.IsOnlyKubernetesWebhookMode() {
263+
return nil
264+
}
265+
266+
c.pipelineStore = storage.NewPipelineStoreKubernetes(controllerClient, controllerClientNoCache)
267+
pipelineStoreForRef = c.pipelineStore
208268
}
209269

210270
glog.Info("Initializing client manager")
211271
glog.Info("Initializing DB client...")
212272
db := InitDBClient(common.GetDurationConfig(initConnectionTimeout))
213273
db.SetConnMaxLifetime(common.GetDurationConfig(dbConMaxLifeTime))
214274
glog.Info("DB client initialized successfully")
215-
// time
216-
c.time = util.NewRealTime()
217-
218-
// UUID generator
219-
c.uuid = util.NewUUIDGenerator()
220275

221276
c.db = db
277+
if !options.UsePipelineKubernetesStorage {
278+
c.pipelineStore = storage.NewPipelineStore(db, c.time, c.uuid)
279+
}
222280
c.experimentStore = storage.NewExperimentStore(db, c.time, c.uuid)
223-
c.pipelineStore = storage.NewPipelineStore(db, c.time, c.uuid)
224-
c.jobStore = storage.NewJobStore(db, c.time)
281+
c.jobStore = storage.NewJobStore(db, c.time, pipelineStoreForRef)
225282
c.taskStore = storage.NewTaskStore(db, c.time, c.uuid)
226-
c.resourceReferenceStore = storage.NewResourceReferenceStore(db)
283+
c.resourceReferenceStore = storage.NewResourceReferenceStore(db, pipelineStoreForRef)
227284
c.dBStatusStore = storage.NewDBStatusStore(db)
228285
c.defaultExperimentStore = storage.NewDefaultExperimentStore(db)
229286
glog.Info("Initializing Object store client...")
@@ -573,11 +630,11 @@ func initLogArchive() (logArchive archive.LogArchiveInterface) {
573630
}
574631

575632
// NewClientManager creates and Init a new instance of ClientManager.
576-
func NewClientManager() (ClientManager, error) {
577-
clientManager := ClientManager{}
578-
err := clientManager.init()
633+
func NewClientManager(options *Options) (*ClientManager, error) {
634+
clientManager := &ClientManager{}
635+
err := clientManager.init(options)
579636
if err != nil {
580-
return ClientManager{}, err
637+
return nil, err
581638
}
582639

583640
return clientManager, nil

0 commit comments

Comments
 (0)