From ed3bd01b6c694ae5ae17f2d15013c3d5ff895a3d Mon Sep 17 00:00:00 2001 From: runakash Date: Mon, 7 Nov 2022 13:06:48 -0800 Subject: [PATCH] Add rate limiter to the Cloudmap API calls. Increasing the base delay of the service export reconciler's rate limiter. --- Makefile | 4 +-- go.mod | 2 +- go.sum | 3 +- main.go | 8 ++--- .../v1alpha1/zz_generated.deepcopy.go | 2 +- pkg/cloudmap/api.go | 32 ++++++++++++++++--- pkg/cloudmap/api_test.go | 10 ++++-- .../multicluster/serviceexport_controller.go | 12 ++++++- 8 files changed, 57 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index a9c1c127..2e8d5d65 100644 --- a/Makefile +++ b/Makefile @@ -73,7 +73,7 @@ ENVTEST_ASSETS_DIR=$(shell pwd)/testbin KUBEBUILDER_ASSETS?="$(shell $(ENVTEST) use -i $(ENVTEST_KUBERNETES_VERSION) --bin-dir=$(ENVTEST_ASSETS_DIR) -p path)" .PHONY: test -test: manifests generate generate-mocks fmt vet test-setup ## Run tests +test: manifests generate generate-mocks fmt vet test-setup goimports lint ## Run tests KUBEBUILDER_ASSETS=$(KUBEBUILDER_ASSETS) go test ./... -coverprofile cover.out -covermode=atomic test-setup: setup-envtest ## Ensure test environment has been downloaded @@ -117,7 +117,7 @@ eks-test: ##@ Build .DEFAULT: build -build: test goimports lint ## Build manager binary. +build: test ## Build manager binary. go build -ldflags="-s -w -X ${PKG}.GitVersion=${GIT_TAG} -X ${PKG}.GitCommit=${GIT_COMMIT}" -o bin/manager main.go run: test ## Run a controller from your host. diff --git a/go.mod b/go.mod index 0bbe01ca..900d7e33 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/onsi/gomega v1.20.2 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.1 + golang.org/x/time v0.1.0 k8s.io/api v0.24.3 k8s.io/apimachinery v0.24.3 k8s.io/client-go v0.24.2 @@ -80,7 +81,6 @@ require ( golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.28.0 // indirect diff --git a/go.sum b/go.sum index 1c38f71f..048f458b 100644 --- a/go.sum +++ b/go.sum @@ -774,8 +774,9 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= +golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/main.go b/main.go index 2eebe4d4..0a075630 100644 --- a/main.go +++ b/main.go @@ -92,24 +92,24 @@ func main() { if err = (&multiclustercontrollers.ServiceExportReconciler{ Client: mgr.GetClient(), - Log: common.NewLogger("controllers", "ServiceExport"), + Log: common.NewLogger("controllers", "ServiceExportReconciler"), Scheme: mgr.GetScheme(), CloudMap: serviceDiscoveryClient, ClusterUtils: clusterUtils, }).SetupWithManager(mgr); err != nil { - log.Error(err, "unable to create controller", "controller", "ServiceExport") + log.Error(err, "unable to create controller", "controller", "ServiceExportReconciler") os.Exit(1) } cloudMapReconciler := &multiclustercontrollers.CloudMapReconciler{ Client: mgr.GetClient(), Cloudmap: serviceDiscoveryClient, - Log: common.NewLogger("controllers", "Cloudmap"), + Log: common.NewLogger("controllers", "CloudmapReconciler"), ClusterUtils: clusterUtils, } if err = mgr.Add(cloudMapReconciler); err != nil { - log.Error(err, "unable to create controller", "controller", "CloudMap") + log.Error(err, "unable to create controller", "controller", "CloudmapReconciler") os.Exit(1) } diff --git a/pkg/apis/multicluster/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/multicluster/v1alpha1/zz_generated.deepcopy.go index f74564df..1eca515f 100644 --- a/pkg/apis/multicluster/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/multicluster/v1alpha1/zz_generated.deepcopy.go @@ -23,7 +23,7 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) diff --git a/pkg/cloudmap/api.go b/pkg/cloudmap/api.go index 977aa2bb..04e169af 100644 --- a/pkg/cloudmap/api.go +++ b/pkg/cloudmap/api.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "time" + + "golang.org/x/time/rate" "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common" "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model" @@ -52,19 +55,30 @@ type ServiceDiscoveryApi interface { } type serviceDiscoveryApi struct { - log common.Logger - awsFacade AwsFacade + log common.Logger + awsFacade AwsFacade + nsRateLimiter *rate.Limiter + svcRateLimiter *rate.Limiter + opRateLimiter *rate.Limiter } // NewServiceDiscoveryApiFromConfig creates a new AWS Cloud Map API connection manager from an AWS client config. func NewServiceDiscoveryApiFromConfig(cfg *aws.Config) ServiceDiscoveryApi { return &serviceDiscoveryApi{ - log: common.NewLogger("cloudmap"), - awsFacade: NewAwsFacadeFromConfig(cfg), + log: common.NewLogger("cloudmap", "api"), + awsFacade: NewAwsFacadeFromConfig(cfg), + nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 5), // 1 per second + svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 10), // 2 per second + opRateLimiter: rate.NewLimiter(rate.Every(100*time.Second), 200), // 100 per second } } func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[string]*model.Namespace, error) { + err := sdApi.nsRateLimiter.Wait(ctx) + if err != nil { + return nil, err + } + namespaceMap := make(map[string]*model.Namespace) pages := sd.NewListNamespacesPaginator(sdApi.awsFacade, &sd.ListNamespacesInput{}) @@ -91,6 +105,11 @@ func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[stri } func (sdApi *serviceDiscoveryApi) GetServiceIdMap(ctx context.Context, nsId string) (map[string]string, error) { + err := sdApi.svcRateLimiter.Wait(ctx) + if err != nil { + return nil, err + } + serviceIdMap := make(map[string]string) filter := types.ServiceFilter{ @@ -155,6 +174,11 @@ func (sdApi *serviceDiscoveryApi) ListOperations(ctx context.Context, opFilters } func (sdApi *serviceDiscoveryApi) GetOperation(ctx context.Context, opId string) (operation *types.Operation, err error) { + err = sdApi.opRateLimiter.Wait(ctx) + if err != nil { + return nil, err + } + opResp, err := sdApi.awsFacade.GetOperation(ctx, &sd.GetOperationInput{OperationId: &opId}) if err != nil { diff --git a/pkg/cloudmap/api_test.go b/pkg/cloudmap/api_test.go index a898374d..d2b210f5 100644 --- a/pkg/cloudmap/api_test.go +++ b/pkg/cloudmap/api_test.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "testing" + "time" + + "golang.org/x/time/rate" aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1" @@ -336,7 +339,10 @@ func getServiceDiscoveryApi(t *testing.T, awsFacade *cloudmapMock.MockAwsFacade) scheme := runtime.NewScheme() scheme.AddKnownTypes(aboutv1alpha1.GroupVersion, &aboutv1alpha1.ClusterProperty{}) return &serviceDiscoveryApi{ - log: common.NewLoggerWithLogr(testr.New(t)), - awsFacade: awsFacade, + log: common.NewLoggerWithLogr(testr.New(t)), + awsFacade: awsFacade, + nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 2), // 1 per second + svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 4), // 2 per second + opRateLimiter: rate.NewLimiter(rate.Every(10*time.Second), 100), // 10 per second } } diff --git a/pkg/controllers/multicluster/serviceexport_controller.go b/pkg/controllers/multicluster/serviceexport_controller.go index 5bea631a..f49a5878 100644 --- a/pkg/controllers/multicluster/serviceexport_controller.go +++ b/pkg/controllers/multicluster/serviceexport_controller.go @@ -3,6 +3,9 @@ package controllers import ( "context" "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/errors" aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1" "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/cloudmap" @@ -12,9 +15,11 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" - "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -295,6 +300,11 @@ func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error { &source.Kind{Type: &aboutv1alpha1.ClusterProperty{}}, handler.EnqueueRequestsFromMapFunc(r.clusterPropertyMappingFunction()), ). + WithOptions(controller.Options{ + // rate-limiting is applied to reconcile responses with an error + // We are increasing the base delay to 500ms, defaults baseDelay: 5ms, maxDelay: 1000s + RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second), + }). Complete(r) }