Skip to content

Commit 5870906

Browse files
authored
fix(backend): Return from pipeline deletions after the cache is updated (#12153)
In Kubernetes native API mode, the REST API could delete a pipeline version but the cache could not have updated by the time the API call returns. This adds a 3 second or less polling to ensure the cache gets updated for a smooth user experience. Signed-off-by: mprahl <[email protected]>
1 parent 85629a1 commit 5870906

File tree

1 file changed

+28
-2
lines changed

1 file changed

+28
-2
lines changed

backend/src/apiserver/storage/pipeline_store_kubernetes.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"slices"
77
"sort"
88
"strings"
9+
"time"
910

1011
"github.com/golang/glog"
1112
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
@@ -21,6 +22,8 @@ import (
2122
"github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
2223
)
2324

25+
const deletionPollTimeout = 3 * time.Second
26+
2427
var (
2528
ErrNoV1 = errors.New("the v1 API is not available for the Kubernetes pipeline store")
2629
ErrUnsupportedField = errors.New("the field is unsupported")
@@ -175,7 +178,7 @@ func (k *PipelineStoreKubernetes) DeletePipeline(pipelineId string) error {
175178
return util.NewInternalServerError(err, "Failed to delete the pipeline")
176179
}
177180

178-
return nil
181+
return k.deleteWithTimeout(k8sPipeline.Namespace, k8sPipeline.Name, &v2beta1.Pipeline{})
179182
}
180183

181184
func (k *PipelineStoreKubernetes) CreatePipelineAndPipelineVersion(pipeline *model.Pipeline, pipelineVersion *model.PipelineVersion) (*model.Pipeline, *model.PipelineVersion, error) {
@@ -447,7 +450,30 @@ func (k *PipelineStoreKubernetes) DeletePipelineVersion(pipelineVersionId string
447450
return util.NewInternalServerError(err, "Failed to delete the pipeline version")
448451
}
449452

450-
return nil
453+
return k.deleteWithTimeout(k8sPipelineVersion.Namespace, k8sPipelineVersion.Name, &v2beta1.PipelineVersion{})
454+
}
455+
456+
// deleteWithTimeout polls until the given namespaced resource is NotFound or the timeout expires.
457+
func (k *PipelineStoreKubernetes) deleteWithTimeout(namespace string, name string, exampleObject ctrlclient.Object) error {
458+
ctx, cancel := context.WithTimeout(context.Background(), deletionPollTimeout)
459+
defer cancel()
460+
461+
for {
462+
select {
463+
case <-ctx.Done():
464+
// No need to return an error here, because the resource is deleted, it's just the cache hasn't updated yet.
465+
return nil
466+
default:
467+
err := k.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, exampleObject)
468+
if k8serrors.IsNotFound(err) {
469+
return nil
470+
}
471+
if err != nil {
472+
return util.NewInternalServerError(err, "failed to check deletion status")
473+
}
474+
time.Sleep(100 * time.Millisecond)
475+
}
476+
}
451477
}
452478

453479
func (k *PipelineStoreKubernetes) getK8sPipeline(pipelineId string) (*v2beta1.Pipeline, error) {

0 commit comments

Comments
 (0)