Skip to content

Commit b967f37

Browse files
committed
feat(operator): add namespaced operators
Signed-off-by: Christoph Bühler <[email protected]>
1 parent fb06960 commit b967f37

File tree

5 files changed

+109
-6
lines changed

5 files changed

+109
-6
lines changed

examples/Operator/todos.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
- other CLI commands
33
- build targets
44
- error handling
5-
- namespaced operator
65
- web: webhooks
76
- docs
87
- try .net 8 AOT?

src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using k8s.LeaderElection;
33
using k8s.Models;
44

5+
using KubeOps.Abstractions.Builder;
56
using KubeOps.KubernetesClient;
67
using KubeOps.Operator.Queue;
78

@@ -20,8 +21,9 @@ public LeaderAwareResourceWatcher(
2021
IServiceProvider provider,
2122
IKubernetesClient<TEntity> client,
2223
TimedEntityQueue<TEntity> queue,
24+
OperatorSettings settings,
2325
LeaderElector elector)
24-
: base(logger, provider, client, queue)
26+
: base(logger, provider, client, queue, settings)
2527
{
2628
_logger = logger;
2729
_elector = elector;

src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using k8s;
66
using k8s.Models;
77

8+
using KubeOps.Abstractions.Builder;
89
using KubeOps.Abstractions.Controller;
910
using KubeOps.Abstractions.Finalizer;
1011
using KubeOps.KubernetesClient;
@@ -24,6 +25,7 @@ internal class ResourceWatcher<TEntity> : IHostedService
2425
private readonly IServiceProvider _provider;
2526
private readonly IKubernetesClient<TEntity> _client;
2627
private readonly TimedEntityQueue<TEntity> _queue;
28+
private readonly OperatorSettings _settings;
2729
private readonly ConcurrentDictionary<string, long> _entityCache = new();
2830
private readonly Lazy<List<FinalizerRegistration>> _finalizers;
2931
private bool _stopped;
@@ -34,12 +36,14 @@ public ResourceWatcher(
3436
ILogger<ResourceWatcher<TEntity>> logger,
3537
IServiceProvider provider,
3638
IKubernetesClient<TEntity> client,
37-
TimedEntityQueue<TEntity> queue)
39+
TimedEntityQueue<TEntity> queue,
40+
OperatorSettings settings)
3841
{
3942
_logger = logger;
4043
_provider = provider;
4144
_client = client;
4245
_queue = queue;
46+
_settings = settings;
4347
_finalizers = new(() => _provider.GetServices<FinalizerRegistration>().ToList());
4448
}
4549

@@ -77,7 +81,7 @@ private void WatchResource()
7781
}
7882
}
7983

80-
_watcher = _client.Watch(OnEvent, OnError, OnClosed);
84+
_watcher = _client.Watch(OnEvent, OnError, OnClosed, @namespace: _settings.Namespace);
8185
}
8286

8387
private void StopWatching()

test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ public EventPublisherIntegrationTest(HostBuilder hostBuilder) : base(hostBuilder
2929
[Fact]
3030
public async Task Should_Create_New_Event()
3131
{
32-
const string eventName = "test-entity.default.REASON.message.Normal";
32+
const string eventName = "single-entity.default.REASON.message.Normal";
3333
var encodedEventName =
3434
Convert.ToHexString(
3535
SHA512.HashData(
3636
Encoding.UTF8.GetBytes(eventName)));
3737

38-
await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "default"));
38+
await _client.CreateAsync(new V1IntegrationTestEntity("single-entity", "username", "default"));
3939
await Mock.WaitForInvocations;
4040

4141
var eventClient = _hostBuilder.Services.GetRequiredService<IKubernetesClient<Corev1Event>>();
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using FluentAssertions;
2+
3+
using k8s;
4+
using k8s.Models;
5+
6+
using KubeOps.Abstractions.Controller;
7+
using KubeOps.KubernetesClient;
8+
using KubeOps.Operator.Test.TestEntities;
9+
using KubeOps.Transpiler;
10+
11+
using Microsoft.Extensions.DependencyInjection;
12+
13+
namespace KubeOps.Operator.Test;
14+
15+
public class NamespacedOperatorIntegrationTest : IntegrationTestBase, IAsyncLifetime
16+
{
17+
private static readonly InvocationCounter<V1IntegrationTestEntity> Mock = new();
18+
private IKubernetesClient<V1IntegrationTestEntity> _client = null!;
19+
private IKubernetesClient<V1Namespace> _nsClient = null!;
20+
21+
public NamespacedOperatorIntegrationTest(HostBuilder hostBuilder) : base(hostBuilder)
22+
{
23+
Mock.Clear();
24+
}
25+
26+
[Fact]
27+
public async Task Should_Call_Reconcile_On_Entity_In_Namespace()
28+
{
29+
var watcherCounter = new InvocationCounter<V1IntegrationTestEntity> { TargetInvocationCount = 2 };
30+
using var watcher = _client.Watch((_, e) => watcherCounter.Invocation(e));
31+
32+
await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "foobar"));
33+
await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "default"));
34+
await Mock.WaitForInvocations;
35+
await watcherCounter.WaitForInvocations;
36+
Mock.Invocations.Count.Should().Be(1);
37+
watcherCounter.Invocations.Count.Should().Be(2);
38+
}
39+
40+
[Fact]
41+
public async Task Should_Not_Call_Reconcile_On_Entity_In_Other_Namespace()
42+
{
43+
var watcherCounter = new InvocationCounter<V1IntegrationTestEntity> { TargetInvocationCount = 1 };
44+
using var watcher = _client.Watch((_, e) => watcherCounter.Invocation(e));
45+
46+
await _client.CreateAsync(new V1IntegrationTestEntity("test-entity2", "username", "default"));
47+
await watcherCounter.WaitForInvocations;
48+
Mock.Invocations.Count.Should().Be(0);
49+
watcherCounter.Invocations.Count.Should().Be(1);
50+
}
51+
52+
public async Task InitializeAsync()
53+
{
54+
var meta = Entities.ToEntityMetadata(typeof(V1IntegrationTestEntity)).Metadata;
55+
_client = new KubernetesClient<V1IntegrationTestEntity>(meta);
56+
_nsClient = new KubernetesClient<V1Namespace>(new(V1Namespace.KubeKind, V1Namespace.KubeApiVersion,
57+
V1Namespace.KubeGroup, V1Namespace.KubePluralName));
58+
await _nsClient.SaveAsync(new V1Namespace(metadata: new(name: "foobar")).Initialize());
59+
await _hostBuilder.ConfigureAndStart(builder => builder.Services
60+
.AddSingleton(Mock)
61+
.AddKubernetesOperator(s => s.Namespace = "foobar")
62+
.AddControllerWithEntity<TestController, V1IntegrationTestEntity>(meta));
63+
}
64+
65+
public async Task DisposeAsync()
66+
{
67+
var entities = await _client.ListAsync("default");
68+
await _nsClient.DeleteAsync("foobar");
69+
while (await _nsClient.GetAsync("foobar") is not null)
70+
{
71+
await Task.Delay(100);
72+
}
73+
await _client.DeleteAsync(entities);
74+
_client.Dispose();
75+
}
76+
77+
private class TestController : IEntityController<V1IntegrationTestEntity>
78+
{
79+
private readonly InvocationCounter<V1IntegrationTestEntity> _svc;
80+
81+
public TestController(InvocationCounter<V1IntegrationTestEntity> svc)
82+
{
83+
_svc = svc;
84+
}
85+
86+
public Task ReconcileAsync(V1IntegrationTestEntity entity)
87+
{
88+
_svc.Invocation(entity);
89+
return Task.CompletedTask;
90+
}
91+
92+
public Task DeletedAsync(V1IntegrationTestEntity entity)
93+
{
94+
_svc.Invocation(entity);
95+
return Task.CompletedTask;
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)