diff --git a/README.md b/README.md index a4823ae..584df34 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,33 @@ This installation includes Prometheus and Grafana as well. 1. `kubectl apply -f scripts/kubernetes/prometheus/` 3. `kubectl apply -f scripts/kubernetes/grafana` +Note: Within container orchestration platforms you have to ensure that all interactions of the client and server are bound to the same Pod the initial request was sent to. This can be achieved by using a `sessionAffinity` which is set to ClientIP + +Example service definition: +```yaml +kind: Service +apiVersion: v1 +metadata: + labels: + app: prometheus-rsocket-proxy-server + # ... +spec: + ports: + - name: https + protocol: TCP + port: 8443 + targetPort: https + - name: rsocket + protocol: TCP + port: 7001 + targetPort: rsocket + selector: + app: prometheus-rsocket-proxy-server + type: ClusterIP + sessionAffinity: ClientIP +``` + + ## Expected performance A 3-pod deployment easily handles 1,000 connected application instances each serving 1,000 distinct time series with <1vCPU and <3Gi RAM total on GKE. diff --git a/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java b/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java index 48d1ca5..2238962 100644 --- a/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java +++ b/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java @@ -72,14 +72,15 @@ public class PrometheusRSocketClient { * * @param registryAndScrape the registry and scrape meter * @param transport the client transport - * @param retry the retry configuration + * @param reconnectRetry the reconnectRetry configuration * @param onKeyReceived the callback if a key has been received */ private PrometheusRSocketClient(MeterRegistryAndScrape registryAndScrape, ClientTransport transport, - Retry retry, + Retry reconnectRetry, Runnable onKeyReceived) { - this(registryAndScrape, transport, retry, Duration.ofSeconds(5), onKeyReceived); + this(registryAndScrape, transport, reconnectRetry, Retry.backoff(6, Duration.ofMillis(100)) + .maxBackoff(Duration.ofSeconds(5)), Duration.ofSeconds(5), onKeyReceived); } /** @@ -93,6 +94,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape registryAndScrape, */ private PrometheusRSocketClient(MeterRegistryAndScrape registryAndScrape, ClientTransport transport, + Retry reconnectRetry, Retry retry, Duration timeout, Runnable onKeyReceived) { @@ -103,7 +105,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape registryAndScrape, .reconnect(new Retry() { @Override public Publisher generateCompanion(Flux retrySignals) { - return retry.generateCompanion(retrySignals + return reconnectRetry.generateCompanion(retrySignals .doOnNext(retrySignal -> { Throwable failure = retrySignal.failure(); DistributionSummary.builder("prometheus.connection.retry") @@ -118,35 +120,57 @@ public Publisher generateCompanion(Flux retrySignals) { } }) .acceptor((payload, r) -> { + LOGGER.trace("Acceptor for requestResponse and fireAndForget has been set up."); this.sendingSocket = r; return Mono.just(new RSocket() { @Override public Mono requestResponse(Payload payload) { - PublicKey key = decodePublicKey(payload.getData()); - latestKey.set(key); - onKeyReceived.run(); - return Mono.fromCallable(() -> scrapePayload(key)); + try { + LOGGER.trace("Received public key from Prometheus proxy in requestResponse."); + PublicKey key = decodePublicKey(payload.getData()); + latestKey.set(key); + onKeyReceived.run(); + return Mono.fromCallable(() -> scrapePayload(key)); + } finally { + payload.release(); + } } @Override public Mono fireAndForget(Payload payload) { - latestKey.set(decodePublicKey(payload.getData())); - onKeyReceived.run(); - return Mono.empty(); + try { + LOGGER.trace("Received public key from Prometheus proxy in fireAndForget."); + latestKey.set(decodePublicKey(payload.getData())); + onKeyReceived.run(); + return Mono.empty(); + } finally { + payload.release(); + } } }); }) .connect(transport) - .doOnError(t -> Counter.builder("prometheus.connection.error") - .baseUnit("errors") - .tag("exception", t.getClass().getSimpleName() == null ? t.getClass().getName() : t.getClass().getSimpleName()) - .register(registryAndScrape.registry) - .increment()) + .doOnError(t -> { + LOGGER.trace("Failed to connect to Prometheus proxy.", t); + Counter.builder("prometheus.connection.error") + .baseUnit("errors") + .tag("exception", t.getClass().getSimpleName() == null ? t.getClass().getName() : t.getClass().getSimpleName()) + .register(registryAndScrape.registry) + .increment(); + }) + .doOnSuccess(connection -> { + LOGGER.trace("Successfully established connection to Prometheus proxy."); + Counter.builder("prometheus.connection.success") + .baseUnit("connections") + .register(registryAndScrape.registry) + .increment(); + }) .doOnNext(connection -> this.connection = connection) .flatMap(socket -> socket.onClose() - .map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819 + .then(Mono.fromCallable(() -> 1)) // https://github.com/rsocket/rsocket-java/issues/819 .onErrorReturn(1)) .repeat(() -> !requestedDisconnect) + .retryWhen(retry) .subscribe(); } @@ -299,9 +323,12 @@ public static class Builder { private MeterRegistryAndScrape registryAndScrape; private final ClientTransport clientTransport; - private Retry retry = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10)) + private Retry reconnectRetry = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10)) .maxBackoff(Duration.ofMinutes(10)); + private Retry retry = Retry.backoff(6, Duration.ofMillis(100)) + .maxBackoff(Duration.ofSeconds(5)); + private Duration timeout = Duration.ofSeconds(5); private Runnable onKeyReceived = () -> { @@ -312,6 +339,17 @@ Builder(M registry, Supplier scrape, ClientTra this.clientTransport = clientTransport; } + /** + * Configures the reconnectRetry for {@link PrometheusRSocketClient}. + * + * @param reconnectRetry the reconnectRetry configuration + * @return the {@link Builder} + */ + public Builder reconnectRetry(Retry reconnectRetry) { + this.reconnectRetry = reconnectRetry; + return this; + } + /** * Configures the retry for {@link PrometheusRSocketClient}. * @@ -366,6 +404,7 @@ public PrometheusRSocketClient connect(Duration timeout) { return new PrometheusRSocketClient( registryAndScrape, clientTransport, + reconnectRetry, retry, timeout, () -> { @@ -387,6 +426,8 @@ public PrometheusRSocketClient connectBlockingly() { /** * Connects the {@link PrometheusRSocketClient} blockingly with the given timeout. * + * @param timeout the timeout to wait for the connection to be established + * * @return the {@link PrometheusRSocketClient} */ public PrometheusRSocketClient connectBlockingly(Duration timeout) { @@ -395,6 +436,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) { PrometheusRSocketClient client = new PrometheusRSocketClient( registryAndScrape, clientTransport, + reconnectRetry, retry, timeout, () -> { diff --git a/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java b/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java index be71797..1db4c91 100644 --- a/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java +++ b/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java @@ -69,7 +69,7 @@ void reconnection() throws InterruptedException { CountDownLatch reconnectionAttemptLatch = new CountDownLatch(3); PrometheusRSocketClient.build(meterRegistry, local) - .retry(Retry.fixedDelay(10, Duration.ofMillis(5)).doAfterRetry(retry -> reconnectionAttemptLatch.countDown())) + .reconnectRetry(Retry.fixedDelay(10, Duration.ofMillis(5)).doAfterRetry(retry -> reconnectionAttemptLatch.countDown())) .connect(); assertThat(reconnectionAttemptLatch.await(1, SECONDS)).isTrue(); @@ -90,7 +90,7 @@ void doesntAttemptReconnectWhenPushAndClose() throws InterruptedException { .block(); PrometheusRSocketClient client = PrometheusRSocketClient.build(meterRegistry, serverTransport.clientTransport()) - .retry(Retry.max(3)) + .reconnectRetry(Retry.max(3)) .connect(); client.pushAndClose(); @@ -144,7 +144,7 @@ public Mono fireAndForget(Payload payload) { }, serverTransport.clientTransport() ) - .retry(Retry.max(0)) + .reconnectRetry(Retry.max(0)) .connectBlockingly(); assertThat(normalScrapeLatch.await(1, SECONDS)).isTrue(); @@ -200,7 +200,7 @@ public Mono requestResponse(Payload payload) { () -> "", serverTransport.clientTransport() ) - .retry(Retry.max(0)) + .reconnectRetry(Retry.max(0)) .timeout(Duration.ofSeconds(10)) .doOnKeyReceived(() -> { await(keyReceivedLatch); diff --git a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java index faaca93..dc7a0ff 100644 --- a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java +++ b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java @@ -39,8 +39,10 @@ public class PrometheusRSocketClientAutoConfiguration { @Bean(destroyMethod = "pushAndCloseBlockingly") PrometheusRSocketClient prometheusRSocketClient(PrometheusMeterRegistry meterRegistry, PrometheusRSocketClientProperties properties) { return PrometheusRSocketClient.build(meterRegistry, properties.createClientTransport()) - .retry(Retry.backoff(properties.getMaxRetries(), properties.getFirstBackoff()) - .maxBackoff(properties.getMaxBackoff())) + .reconnectRetry(Retry.backoff(properties.getReconnectRetry().getMaxRetries(), properties.getReconnectRetry().getFirstBackoff()) + .maxBackoff(properties.getReconnectRetry().getMaxBackoff())) + .retry(Retry.backoff(properties.getRetry().getMaxRetries(), properties.getRetry().getFirstBackoff()) + .maxBackoff(properties.getRetry().getMaxBackoff())) .timeout(properties.getTimeout()) .connectBlockingly(); } diff --git a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java index d73791a..1493e24 100644 --- a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java +++ b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java @@ -20,6 +20,7 @@ import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; import org.springframework.boot.convert.DurationUnit; import reactor.netty.http.client.HttpClient; import reactor.netty.tcp.TcpClient; @@ -41,19 +42,98 @@ public class PrometheusRSocketClientProperties { private int port = 7001; /** - * The maximum number of connection attempts to make. + * Retry configuration for the reconnect. */ - private long maxRetries = Long.MAX_VALUE; + @NestedConfigurationProperty + private ReconnectRetry reconnectRetry = new ReconnectRetry(); /** - * The first connection attempt backoff delay to apply, then grow exponentially. + * Retry configuration for the subscription. */ - private Duration firstBackoff = Duration.ofSeconds(10); + @NestedConfigurationProperty + private Retry retry = new Retry(); - /** - * The maximum connection attempt delay to apply despite exponential growth. - */ - private Duration maxBackoff = Duration.ofMinutes(10); + public static class ReconnectRetry { + /** + * The maximum number of connection attempts to make. + */ + private long maxRetries = Long.MAX_VALUE; + + /** + * The first connection attempt backoff delay to apply, then grow exponentially. + */ + private Duration firstBackoff = Duration.ofSeconds(10); + + /** + * The maximum connection attempt delay to apply despite exponential growth. + */ + private Duration maxBackoff = Duration.ofMinutes(10); + + public long getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(long maxRetries) { + this.maxRetries = maxRetries; + } + + public Duration getFirstBackoff() { + return firstBackoff; + } + + public void setFirstBackoff(Duration firstBackoff) { + this.firstBackoff = firstBackoff; + } + + public Duration getMaxBackoff() { + return maxBackoff; + } + + public void setMaxBackoff(Duration maxBackoff) { + this.maxBackoff = maxBackoff; + } + } + + public static class Retry { + /** + * The maximum number of connection attempts to make. + */ + private long maxRetries = 6; + + /** + * The first connection attempt backoff delay to apply, then grow exponentially. + */ + private Duration firstBackoff = Duration.ofMillis(100); + + /** + * The maximum connection attempt delay to apply despite exponential growth. + */ + private Duration maxBackoff = Duration.ofSeconds(5); + + public long getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(long maxRetries) { + this.maxRetries = maxRetries; + } + + public Duration getFirstBackoff() { + return firstBackoff; + } + + public void setFirstBackoff(Duration firstBackoff) { + this.firstBackoff = firstBackoff; + } + + public Duration getMaxBackoff() { + return maxBackoff; + } + + public void setMaxBackoff(Duration maxBackoff) { + this.maxBackoff = maxBackoff; + } + } /** * RSocket transport protocol. @@ -71,30 +151,6 @@ public class PrometheusRSocketClientProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration timeout = Duration.ofSeconds(5); - public long getMaxRetries() { - return maxRetries; - } - - public void setMaxRetries(long maxRetries) { - this.maxRetries = maxRetries; - } - - public Duration getFirstBackoff() { - return firstBackoff; - } - - public void setFirstBackoff(Duration firstBackoff) { - this.firstBackoff = firstBackoff; - } - - public Duration getMaxBackoff() { - return maxBackoff; - } - - public void setMaxBackoff(Duration maxBackoff) { - this.maxBackoff = maxBackoff; - } - public String getHost() { return host; } @@ -135,6 +191,22 @@ public void setTimeout(Duration timeout) { this.timeout = timeout; } + public ReconnectRetry getReconnectRetry() { + return reconnectRetry; + } + + public void setReconnectRetry(ReconnectRetry reconnectRetry) { + this.reconnectRetry = reconnectRetry; + } + + public Retry getRetry() { + return retry; + } + + public void setRetry(Retry retry) { + this.retry = retry; + } + ClientTransport createClientTransport() { final TcpClient tcpClient = TcpClient.create().host(this.host).port(this.port); return this.transport.create(this.secure ? tcpClient.secure() : tcpClient);