Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
KUBEBUILDER_ASSETS?="$(shell $(ENVTEST) use -i $(ENVTEST_KUBERNETES_VERSION) --bin-dir=$(ENVTEST_ASSETS_DIR) -p path)"

.PHONY: test
test: manifests generate generate-mocks fmt vet test-setup ## Run tests
test: manifests generate generate-mocks fmt vet test-setup goimports lint ## Run tests
KUBEBUILDER_ASSETS=$(KUBEBUILDER_ASSETS) go test ./... -coverprofile cover.out -covermode=atomic

test-setup: setup-envtest ## Ensure test environment has been downloaded
Expand Down Expand Up @@ -117,7 +117,7 @@ eks-test:
##@ Build

.DEFAULT: build
build: test goimports lint ## Build manager binary.
build: test ## Build manager binary.
go build -ldflags="-s -w -X ${PKG}.GitVersion=${GIT_TAG} -X ${PKG}.GitCommit=${GIT_COMMIT}" -o bin/manager main.go

run: test ## Run a controller from your host.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/onsi/gomega v1.20.2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
golang.org/x/time v0.1.0
k8s.io/api v0.24.3
k8s.io/apimachinery v0.24.3
k8s.io/client-go v0.24.2
Expand Down Expand Up @@ -80,7 +81,6 @@ require (
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,9 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,24 @@ func main() {

if err = (&multiclustercontrollers.ServiceExportReconciler{
Client: mgr.GetClient(),
Log: common.NewLogger("controllers", "ServiceExport"),
Log: common.NewLogger("controllers", "ServiceExportReconciler"),
Scheme: mgr.GetScheme(),
CloudMap: serviceDiscoveryClient,
ClusterUtils: clusterUtils,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "ServiceExport")
log.Error(err, "unable to create controller", "controller", "ServiceExportReconciler")
os.Exit(1)
}

cloudMapReconciler := &multiclustercontrollers.CloudMapReconciler{
Client: mgr.GetClient(),
Cloudmap: serviceDiscoveryClient,
Log: common.NewLogger("controllers", "Cloudmap"),
Log: common.NewLogger("controllers", "CloudmapReconciler"),
ClusterUtils: clusterUtils,
}

if err = mgr.Add(cloudMapReconciler); err != nil {
log.Error(err, "unable to create controller", "controller", "CloudMap")
log.Error(err, "unable to create controller", "controller", "CloudmapReconciler")
os.Exit(1)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/multicluster/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 28 additions & 4 deletions pkg/cloudmap/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/time/rate"

"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common"
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
Expand Down Expand Up @@ -52,19 +55,30 @@ type ServiceDiscoveryApi interface {
}

type serviceDiscoveryApi struct {
log common.Logger
awsFacade AwsFacade
log common.Logger
awsFacade AwsFacade
nsRateLimiter *rate.Limiter
svcRateLimiter *rate.Limiter
opRateLimiter *rate.Limiter
}

// NewServiceDiscoveryApiFromConfig creates a new AWS Cloud Map API connection manager from an AWS client config.
func NewServiceDiscoveryApiFromConfig(cfg *aws.Config) ServiceDiscoveryApi {
return &serviceDiscoveryApi{
log: common.NewLogger("cloudmap"),
awsFacade: NewAwsFacadeFromConfig(cfg),
log: common.NewLogger("cloudmap", "api"),
awsFacade: NewAwsFacadeFromConfig(cfg),
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 5), // 1 per second
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 10), // 2 per second
opRateLimiter: rate.NewLimiter(rate.Every(100*time.Second), 200), // 100 per second
}
}

func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[string]*model.Namespace, error) {
err := sdApi.nsRateLimiter.Wait(ctx)
if err != nil {
return nil, err
}

namespaceMap := make(map[string]*model.Namespace)

pages := sd.NewListNamespacesPaginator(sdApi.awsFacade, &sd.ListNamespacesInput{})
Expand All @@ -91,6 +105,11 @@ func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[stri
}

func (sdApi *serviceDiscoveryApi) GetServiceIdMap(ctx context.Context, nsId string) (map[string]string, error) {
err := sdApi.svcRateLimiter.Wait(ctx)
if err != nil {
return nil, err
}

serviceIdMap := make(map[string]string)

filter := types.ServiceFilter{
Expand Down Expand Up @@ -155,6 +174,11 @@ func (sdApi *serviceDiscoveryApi) ListOperations(ctx context.Context, opFilters
}

func (sdApi *serviceDiscoveryApi) GetOperation(ctx context.Context, opId string) (operation *types.Operation, err error) {
err = sdApi.opRateLimiter.Wait(ctx)
if err != nil {
return nil, err
}

opResp, err := sdApi.awsFacade.GetOperation(ctx, &sd.GetOperationInput{OperationId: &opId})

if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/cloudmap/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"errors"
"fmt"
"testing"
"time"

"golang.org/x/time/rate"

aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1"

Expand Down Expand Up @@ -336,7 +339,10 @@ func getServiceDiscoveryApi(t *testing.T, awsFacade *cloudmapMock.MockAwsFacade)
scheme := runtime.NewScheme()
scheme.AddKnownTypes(aboutv1alpha1.GroupVersion, &aboutv1alpha1.ClusterProperty{})
return &serviceDiscoveryApi{
log: common.NewLoggerWithLogr(testr.New(t)),
awsFacade: awsFacade,
log: common.NewLoggerWithLogr(testr.New(t)),
awsFacade: awsFacade,
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 2), // 1 per second
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 4), // 2 per second
opRateLimiter: rate.NewLimiter(rate.Every(10*time.Second), 100), // 10 per second
}
}
12 changes: 11 additions & 1 deletion pkg/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controllers
import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"

aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1"
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/cloudmap"
Expand All @@ -12,9 +15,11 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -295,6 +300,11 @@ func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
&source.Kind{Type: &aboutv1alpha1.ClusterProperty{}},
handler.EnqueueRequestsFromMapFunc(r.clusterPropertyMappingFunction()),
).
WithOptions(controller.Options{
// rate-limiting is applied to reconcile responses with an error
// We are increasing the base delay to 500ms, defaults baseDelay: 5ms, maxDelay: 1000s
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
}).
Complete(r)
}

Expand Down