Skip to content

Commit 947a988

Browse files
authored
fix(backend): Add support for additional filters on the Kubernetes native API (#12189)
* Add support for additional filters on the Kubernetes native API This adds support for eq, neq, and in filters. Signed-off-by: mprahl <[email protected]> * Fix filter mapping in Kubernetes native API Signed-off-by: mprahl <[email protected]> --------- Signed-off-by: mprahl <[email protected]>
1 parent 93ec416 commit 947a988

File tree

4 files changed

+360
-61
lines changed

4 files changed

+360
-61
lines changed

backend/src/apiserver/filter/filter.go

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package filter
1919
import (
2020
"encoding/json"
2121
"fmt"
22+
"reflect"
2223
"strings"
2324

2425
"github.com/Masterminds/squirrel"
@@ -28,7 +29,7 @@ import (
2829
"github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
2930
)
3031

31-
const filterMessage = "Filter %v is not implemented for Kubernetes pipeline store. Only substring is supported."
32+
const filterMessage = "Filter %v is not implemented for Kubernetes pipeline store. Supported filters are eq, neq, in, and substring."
3233

3334
// Internal representation of a predicate.
3435
type Predicate struct {
@@ -211,52 +212,34 @@ func replaceMapKeys(m map[string][]interface{}, keyMap map[string]string, prefix
211212
}
212213

213214
func (f *Filter) FilterK8sPipelines(pipeline v2beta1.Pipeline) (bool, error) {
214-
if len(f.eq) > 0 {
215-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "eq"))
216-
}
217-
218-
if len(f.neq) > 0 {
219-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "neq"))
220-
}
221-
222-
if len(f.gt) > 0 {
223-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "gt"))
224-
}
225-
226-
if len(f.gte) > 0 {
227-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "gte"))
228-
}
229-
230-
if len(f.lt) > 0 {
231-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "lt"))
232-
}
233-
234-
if len(f.lte) > 0 {
235-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "lte"))
236-
}
215+
return f.matchesFilter(func(k string) interface{} { return pipeline.GetField(k) })
216+
}
237217

238-
if len(f.in) > 0 {
239-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "in"))
240-
}
218+
func (f *Filter) FilterK8sPipelineVersions(pipelineVersion v2beta1.PipelineVersion) (bool, error) {
219+
return f.matchesFilter(func(k string) interface{} { return pipelineVersion.GetField(k) })
220+
}
241221

242-
for k := range f.substring {
243-
for _, v := range f.substring[k] {
244-
if strings.Contains(fmt.Sprint(pipeline.GetField(k)), fmt.Sprint(v)) {
245-
return true, nil
222+
// matchesFilter applies client-side filtering with AND semantics across predicate groups.
223+
// The getField function should return the value of the given key on the object being filtered.
224+
func (f *Filter) matchesFilter(getField func(string) interface{}) (bool, error) {
225+
// EQ: all specified equals must hold.
226+
for k := range f.eq {
227+
fieldVal := fmt.Sprint(getField(k))
228+
for _, v := range f.eq[k] {
229+
if fieldVal != fmt.Sprint(v) {
230+
return false, nil
246231
}
247232
}
248233
}
249234

250-
return false, nil
251-
}
252-
253-
func (f *Filter) FilterK8sPipelineVersions(pipelineVersion v2beta1.PipelineVersion) (bool, error) {
254-
if len(f.eq) > 0 {
255-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "eq"))
256-
}
257-
258-
if len(f.neq) > 0 {
259-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "neq"))
235+
// NEQ: none of the specified values may match (NOT IN semantics).
236+
for k := range f.neq {
237+
fieldVal := fmt.Sprint(getField(k))
238+
for _, v := range f.neq[k] {
239+
if fieldVal == fmt.Sprint(v) {
240+
return false, nil
241+
}
242+
}
260243
}
261244

262245
if len(f.gt) > 0 {
@@ -275,19 +258,39 @@ func (f *Filter) FilterK8sPipelineVersions(pipelineVersion v2beta1.PipelineVersi
275258
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "lte"))
276259
}
277260

278-
if len(f.in) > 0 {
279-
return false, util.NewInvalidInputError("%s", fmt.Sprintf(filterMessage, "in"))
261+
// IN: field must be a member of all provided IN lists for the same key (AND semantics across lists)
262+
for k := range f.in {
263+
fieldVal := fmt.Sprint(getField(k))
264+
for _, list := range f.in[k] {
265+
inOne := false
266+
rv := reflect.ValueOf(list)
267+
if rv.Kind() != reflect.Slice && rv.Kind() != reflect.Array {
268+
return false, nil
269+
}
270+
for i := 0; i < rv.Len(); i++ {
271+
if fieldVal == fmt.Sprint(rv.Index(i).Interface()) {
272+
inOne = true
273+
break
274+
}
275+
}
276+
if !inOne {
277+
return false, nil
278+
}
279+
}
280280
}
281281

282+
// SUBSTRING: all specified substrings must be present.
282283
for k := range f.substring {
284+
fieldVal := fmt.Sprint(getField(k))
283285
for _, v := range f.substring[k] {
284-
if strings.Contains(fmt.Sprint(pipelineVersion.GetField(k)), fmt.Sprint(v)) {
285-
return true, nil
286+
if !strings.Contains(fieldVal, fmt.Sprint(v)) {
287+
return false, nil
286288
}
287289
}
288290
}
289291

290-
return false, nil
292+
// If no checks failed, the item matches the filter.
293+
return true, nil
291294
}
292295

293296
// AddToSelect builds a WHERE clause from the Filter f, adds it to the supplied

0 commit comments

Comments
 (0)