From 81ea48fd29be3a055d39e04c84dc2e96f070c94c Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 9 Jul 2025 16:15:31 +0800 Subject: [PATCH 1/3] apikeyauthextension: propagate status code in errs Updates the extension to return a `status.Error` type (which wraps the `status.Status` type) instead of a plain `error` type. This allows propagating the status code and message from the extension to the collector. This is similar to what the upstream OTLP receiver does: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.129.0/receiver/otlpreceiver/otlphttp.go#L183-L192 Signed-off-by: Marc Lopez Rubio --- .../apikeyauthextension/authenticator.go | 30 ++++++++++++------- .../apikeyauthextension/authenticator_test.go | 16 +++++----- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/extension/apikeyauthextension/authenticator.go b/extension/apikeyauthextension/authenticator.go index 71d7e9ec..288aa33b 100644 --- a/extension/apikeyauthextension/authenticator.go +++ b/extension/apikeyauthextension/authenticator.go @@ -32,6 +32,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/extensionauth" "golang.org/x/crypto/pbkdf2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/typedapi/security/hasprivileges" @@ -225,12 +227,12 @@ func (a *authenticator) getCacheKey(id string, headers map[string][]string) (str func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]string) (context.Context, error) { authHeaderValue, id, err := a.parseAuthorizationHeader(headers) if err != nil { - return ctx, err + return ctx, status.Error(codes.Unauthenticated, err.Error()) } cacheKey, err := a.getCacheKey(id, headers) if err != nil { - return ctx, err + return ctx, status.Error(codes.Internal, err.Error()) } derivedKey := pbkdf2.Key( @@ -245,19 +247,21 @@ func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]s // Client has specified an API Key with a colliding ID, // but whose secret component does not match the one in // the cache. - return ctx, fmt.Errorf("API Key %q unauthorized", id) + return ctx, status.Errorf(codes.InvalidArgument, + "API Key %q unauthorized", id, + ) } if cacheEntry.err != nil { - return ctx, cacheEntry.err + return ctx, status.Error(codes.Unauthenticated, cacheEntry.err.Error()) } - clientInfo := client.FromContext(ctx) - clientInfo.Auth = cacheEntry.data - return client.NewContext(ctx, clientInfo), nil + return newCtxWithAuthData(ctx, cacheEntry.data), nil } hasPrivileges, username, err := a.hasPrivileges(ctx, authHeaderValue) if err != nil { - return ctx, err + return ctx, status.Error(codes.Internal, fmt.Sprintf( + "error checking privileges for API Key %q: %v", id, err, + )) } if !hasPrivileges { cacheEntry := &cacheEntry{ @@ -265,7 +269,7 @@ func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]s err: fmt.Errorf("API Key %q unauthorized", id), } a.cache.Add(cacheKey, cacheEntry) - return ctx, cacheEntry.err + return ctx, status.Error(codes.PermissionDenied, cacheEntry.err.Error()) } cacheEntry := &cacheEntry{ key: derivedKey, @@ -275,7 +279,11 @@ func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]s }, } a.cache.Add(cacheKey, cacheEntry) + return newCtxWithAuthData(ctx, cacheEntry.data), nil +} + +func newCtxWithAuthData(ctx context.Context, authData *authData) context.Context { clientInfo := client.FromContext(ctx) - clientInfo.Auth = cacheEntry.data - return client.NewContext(ctx, clientInfo), nil + clientInfo.Auth = authData + return client.NewContext(ctx, clientInfo) } diff --git a/extension/apikeyauthextension/authenticator_test.go b/extension/apikeyauthextension/authenticator_test.go index a70001c0..f4c8a52f 100644 --- a/extension/apikeyauthextension/authenticator_test.go +++ b/extension/apikeyauthextension/authenticator_test.go @@ -74,11 +74,11 @@ func TestAuthenticator(t *testing.T) { }, Status: 400, }), - expectedErr: `status: 400, failed: [a_type], reason: a_reason`, + expectedErr: `rpc error: code = Internal desc = error checking privileges for API Key "id": status: 400, failed: [a_type], reason: a_reason`, }, "missing_privileges": { handler: newCannedHasPrivilegesHandler(hasprivileges.Response{HasAllRequested: false}), - expectedErr: `API Key "id" unauthorized`, + expectedErr: `rpc error: code = PermissionDenied desc = API Key "id" unauthorized`, }, } { t.Run(name, func(t *testing.T) { @@ -170,7 +170,7 @@ func TestAuthenticator_Caching(t *testing.T) { _, err = authenticator.Authenticate(context.Background(), map[string][]string{ "Authorization": {"ApiKey " + base64.StdEncoding.EncodeToString([]byte("id2:secret2"))}, }) - assert.EqualError(t, err, `API Key "id2" unauthorized`) + assert.EqualError(t, err, `rpc error: code = InvalidArgument desc = API Key "id2" unauthorized`) } func TestAuthenticator_CacheKeyHeaders(t *testing.T) { @@ -183,7 +183,7 @@ func TestAuthenticator_CacheKeyHeaders(t *testing.T) { _, err := authenticator.Authenticate(context.Background(), map[string][]string{ "Authorization": {"ApiKey " + base64.StdEncoding.EncodeToString([]byte("id1:secret1"))}, }) - require.EqualError(t, err, `error computing cache key: missing header "X-Tenant-Id"`) + require.EqualError(t, err, `rpc error: code = Internal desc = error computing cache key: missing header "X-Tenant-Id"`) ctx, err := authenticator.Authenticate(context.Background(), map[string][]string{ "X-Tenant-Id": {"tenant1"}, @@ -259,7 +259,7 @@ func TestAuthenticator_AuthorizationHeader(t *testing.T) { }, "missing_header": { headers: map[string][]string{}, - expectedErr: `missing header "Authorization"`, + expectedErr: `rpc error: code = Unauthenticated desc = missing header "Authorization"`, }, "invalid_scheme": { headers: map[string][]string{ @@ -267,13 +267,13 @@ func TestAuthenticator_AuthorizationHeader(t *testing.T) { "Bearer " + base64.StdEncoding.EncodeToString([]byte("id:secret")), }, }, - expectedErr: `ApiKey prefix not found`, + expectedErr: `rpc error: code = Unauthenticated desc = ApiKey prefix not found`, }, "invalid_base64": { headers: map[string][]string{ "Authorization": {"ApiKey not_base64"}, }, - expectedErr: "illegal base64 data at input byte 3", + expectedErr: "rpc error: code = Unauthenticated desc = illegal base64 data at input byte 3", }, "invalid_encoded_apikey": { headers: map[string][]string{ @@ -281,7 +281,7 @@ func TestAuthenticator_AuthorizationHeader(t *testing.T) { "ApiKey " + base64.StdEncoding.EncodeToString([]byte("junk")), }, }, - expectedErr: "invalid API Key", + expectedErr: "rpc error: code = Unauthenticated desc = invalid API Key", }, } { t.Run(name, func(t *testing.T) { From 64598467980d7dba2ac47f3004c8aaa7f5eb9cce Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 9 Jul 2025 16:28:33 +0800 Subject: [PATCH 2/3] Address review comments Signed-off-by: Marc Lopez Rubio --- extension/apikeyauthextension/authenticator.go | 12 ++++++++---- extension/apikeyauthextension/authenticator_test.go | 6 +++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/extension/apikeyauthextension/authenticator.go b/extension/apikeyauthextension/authenticator.go index 288aa33b..4c85ea22 100644 --- a/extension/apikeyauthextension/authenticator.go +++ b/extension/apikeyauthextension/authenticator.go @@ -224,6 +224,10 @@ func (a *authenticator) getCacheKey(id string, headers map[string][]string) (str // Authenticate validates an ApiKey scheme Authorization header, // passing it to Elasticsearch for checking privileges. +// +// Callers can use status.FromError(err) to get the status code +// and message from the returned error. If no status.Status is returned, +// the error should be considered an internal error. func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]string) (context.Context, error) { authHeaderValue, id, err := a.parseAuthorizationHeader(headers) if err != nil { @@ -232,7 +236,7 @@ func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]s cacheKey, err := a.getCacheKey(id, headers) if err != nil { - return ctx, status.Error(codes.Internal, err.Error()) + return ctx, err } derivedKey := pbkdf2.Key( @@ -247,7 +251,7 @@ func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]s // Client has specified an API Key with a colliding ID, // but whose secret component does not match the one in // the cache. - return ctx, status.Errorf(codes.InvalidArgument, + return ctx, status.Errorf(codes.Unauthenticated, "API Key %q unauthorized", id, ) } @@ -259,9 +263,9 @@ func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]s hasPrivileges, username, err := a.hasPrivileges(ctx, authHeaderValue) if err != nil { - return ctx, status.Error(codes.Internal, fmt.Sprintf( + return ctx, fmt.Errorf( "error checking privileges for API Key %q: %v", id, err, - )) + ) } if !hasPrivileges { cacheEntry := &cacheEntry{ diff --git a/extension/apikeyauthextension/authenticator_test.go b/extension/apikeyauthextension/authenticator_test.go index f4c8a52f..e94c649d 100644 --- a/extension/apikeyauthextension/authenticator_test.go +++ b/extension/apikeyauthextension/authenticator_test.go @@ -74,7 +74,7 @@ func TestAuthenticator(t *testing.T) { }, Status: 400, }), - expectedErr: `rpc error: code = Internal desc = error checking privileges for API Key "id": status: 400, failed: [a_type], reason: a_reason`, + expectedErr: `error checking privileges for API Key "id": status: 400, failed: [a_type], reason: a_reason`, }, "missing_privileges": { handler: newCannedHasPrivilegesHandler(hasprivileges.Response{HasAllRequested: false}), @@ -170,7 +170,7 @@ func TestAuthenticator_Caching(t *testing.T) { _, err = authenticator.Authenticate(context.Background(), map[string][]string{ "Authorization": {"ApiKey " + base64.StdEncoding.EncodeToString([]byte("id2:secret2"))}, }) - assert.EqualError(t, err, `rpc error: code = InvalidArgument desc = API Key "id2" unauthorized`) + assert.EqualError(t, err, `rpc error: code = Unauthenticated desc = API Key "id2" unauthorized`) } func TestAuthenticator_CacheKeyHeaders(t *testing.T) { @@ -183,7 +183,7 @@ func TestAuthenticator_CacheKeyHeaders(t *testing.T) { _, err := authenticator.Authenticate(context.Background(), map[string][]string{ "Authorization": {"ApiKey " + base64.StdEncoding.EncodeToString([]byte("id1:secret1"))}, }) - require.EqualError(t, err, `rpc error: code = Internal desc = error computing cache key: missing header "X-Tenant-Id"`) + require.EqualError(t, err, `error computing cache key: missing header "X-Tenant-Id"`) ctx, err := authenticator.Authenticate(context.Background(), map[string][]string{ "X-Tenant-Id": {"tenant1"}, From c773e84740d3f58a16e4e5a70c791d3e521a2c86 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 9 Jul 2025 16:32:51 +0800 Subject: [PATCH 3/3] go mod tidy Signed-off-by: Marc Lopez Rubio --- extension/apikeyauthextension/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/apikeyauthextension/go.mod b/extension/apikeyauthextension/go.mod index b2800dac..b62b17a5 100644 --- a/extension/apikeyauthextension/go.mod +++ b/extension/apikeyauthextension/go.mod @@ -17,6 +17,7 @@ require ( go.opentelemetry.io/collector/extension/extensiontest v0.129.0 go.uber.org/goleak v1.3.0 golang.org/x/crypto v0.39.0 + google.golang.org/grpc v1.73.0 ) require ( @@ -67,7 +68,6 @@ require ( golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.26.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect - google.golang.org/grpc v1.73.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect