Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,33 @@ This installation includes Prometheus and Grafana as well.
1. `kubectl apply -f scripts/kubernetes/prometheus/`
3. `kubectl apply -f scripts/kubernetes/grafana`

Note: Within container orchestration platforms you have to ensure that all interactions of the client and server are bound to the same Pod the initial request was sent to. This can be achieved by using a `sessionAffinity` which is set to ClientIP

Example service definition:
```yaml
kind: Service
apiVersion: v1
metadata:
labels:
app: prometheus-rsocket-proxy-server
# ...
spec:
ports:
- name: https
protocol: TCP
port: 8443
targetPort: https
- name: rsocket
protocol: TCP
port: 7001
targetPort: rsocket
selector:
app: prometheus-rsocket-proxy-server
type: ClusterIP
sessionAffinity: ClientIP
```


## Expected performance

A 3-pod deployment easily handles 1,000 connected application instances each serving 1,000 distinct time series with <1vCPU and <3Gi RAM total on GKE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ public class PrometheusRSocketClient {
*
* @param registryAndScrape the registry and scrape meter
* @param transport the client transport
* @param retry the retry configuration
* @param reconnectRetry the reconnectRetry configuration
* @param onKeyReceived the callback if a key has been received
*/
private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
ClientTransport transport,
Retry retry,
Retry reconnectRetry,
Runnable onKeyReceived) {
this(registryAndScrape, transport, retry, Duration.ofSeconds(5), onKeyReceived);
this(registryAndScrape, transport, reconnectRetry, Retry.backoff(6, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5)), Duration.ofSeconds(5), onKeyReceived);
}

/**
Expand All @@ -93,6 +94,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
*/
private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
ClientTransport transport,
Retry reconnectRetry,
Retry retry,
Duration timeout,
Runnable onKeyReceived) {
Expand All @@ -103,7 +105,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
.reconnect(new Retry() {
@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
return retry.generateCompanion(retrySignals
return reconnectRetry.generateCompanion(retrySignals
.doOnNext(retrySignal -> {
Throwable failure = retrySignal.failure();
DistributionSummary.builder("prometheus.connection.retry")
Expand All @@ -118,35 +120,57 @@ public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
}
})
.acceptor((payload, r) -> {
LOGGER.trace("Acceptor for requestResponse and fireAndForget has been set up.");
this.sendingSocket = r;
return Mono.just(new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
PublicKey key = decodePublicKey(payload.getData());
latestKey.set(key);
onKeyReceived.run();
return Mono.fromCallable(() -> scrapePayload(key));
try {
LOGGER.trace("Received public key from Prometheus proxy in requestResponse.");
PublicKey key = decodePublicKey(payload.getData());
latestKey.set(key);
onKeyReceived.run();
return Mono.fromCallable(() -> scrapePayload(key));
} finally {
payload.release();
}
}

@Override
public Mono<Void> fireAndForget(Payload payload) {
latestKey.set(decodePublicKey(payload.getData()));
onKeyReceived.run();
return Mono.empty();
try {
LOGGER.trace("Received public key from Prometheus proxy in fireAndForget.");
latestKey.set(decodePublicKey(payload.getData()));
onKeyReceived.run();
return Mono.empty();
} finally {
payload.release();
}
}
});
})
.connect(transport)
.doOnError(t -> Counter.builder("prometheus.connection.error")
.baseUnit("errors")
.tag("exception", t.getClass().getSimpleName() == null ? t.getClass().getName() : t.getClass().getSimpleName())
.register(registryAndScrape.registry)
.increment())
.doOnError(t -> {
LOGGER.trace("Failed to connect to Prometheus proxy.", t);
Counter.builder("prometheus.connection.error")
.baseUnit("errors")
.tag("exception", t.getClass().getSimpleName() == null ? t.getClass().getName() : t.getClass().getSimpleName())
.register(registryAndScrape.registry)
.increment();
})
.doOnSuccess(connection -> {
LOGGER.trace("Successfully established connection to Prometheus proxy.");
Counter.builder("prometheus.connection.success")
.baseUnit("connections")
.register(registryAndScrape.registry)
.increment();
})
.doOnNext(connection -> this.connection = connection)
.flatMap(socket -> socket.onClose()
.map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819
.then(Mono.fromCallable(() -> 1)) // https://github.com/rsocket/rsocket-java/issues/819
.onErrorReturn(1))
.repeat(() -> !requestedDisconnect)
.retryWhen(retry)
.subscribe();
}

Expand Down Expand Up @@ -299,9 +323,12 @@ public static class Builder {
private MeterRegistryAndScrape<?> registryAndScrape;
private final ClientTransport clientTransport;

private Retry retry = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10))
private Retry reconnectRetry = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10))
.maxBackoff(Duration.ofMinutes(10));

private Retry retry = Retry.backoff(6, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5));

private Duration timeout = Duration.ofSeconds(5);

private Runnable onKeyReceived = () -> {
Expand All @@ -312,6 +339,17 @@ <M extends MeterRegistry> Builder(M registry, Supplier<String> scrape, ClientTra
this.clientTransport = clientTransport;
}

/**
* Configures the reconnectRetry for {@link PrometheusRSocketClient}.
*
* @param reconnectRetry the reconnectRetry configuration
* @return the {@link Builder}
*/
public Builder reconnectRetry(Retry reconnectRetry) {
this.reconnectRetry = reconnectRetry;
return this;
}

/**
* Configures the retry for {@link PrometheusRSocketClient}.
*
Expand Down Expand Up @@ -366,6 +404,7 @@ public PrometheusRSocketClient connect(Duration timeout) {
return new PrometheusRSocketClient(
registryAndScrape,
clientTransport,
reconnectRetry,
retry,
timeout,
() -> {
Expand All @@ -387,6 +426,8 @@ public PrometheusRSocketClient connectBlockingly() {
/**
* Connects the {@link PrometheusRSocketClient} blockingly with the given timeout.
*
* @param timeout the timeout to wait for the connection to be established
*
* @return the {@link PrometheusRSocketClient}
*/
public PrometheusRSocketClient connectBlockingly(Duration timeout) {
Expand All @@ -395,6 +436,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) {
PrometheusRSocketClient client = new PrometheusRSocketClient(
registryAndScrape,
clientTransport,
reconnectRetry,
retry,
timeout,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void reconnection() throws InterruptedException {
CountDownLatch reconnectionAttemptLatch = new CountDownLatch(3);

PrometheusRSocketClient.build(meterRegistry, local)
.retry(Retry.fixedDelay(10, Duration.ofMillis(5)).doAfterRetry(retry -> reconnectionAttemptLatch.countDown()))
.reconnectRetry(Retry.fixedDelay(10, Duration.ofMillis(5)).doAfterRetry(retry -> reconnectionAttemptLatch.countDown()))
.connect();

assertThat(reconnectionAttemptLatch.await(1, SECONDS)).isTrue();
Expand All @@ -90,7 +90,7 @@ void doesntAttemptReconnectWhenPushAndClose() throws InterruptedException {
.block();

PrometheusRSocketClient client = PrometheusRSocketClient.build(meterRegistry, serverTransport.clientTransport())
.retry(Retry.max(3))
.reconnectRetry(Retry.max(3))
.connect();

client.pushAndClose();
Expand Down Expand Up @@ -144,7 +144,7 @@ public Mono<Void> fireAndForget(Payload payload) {
},
serverTransport.clientTransport()
)
.retry(Retry.max(0))
.reconnectRetry(Retry.max(0))
.connectBlockingly();

assertThat(normalScrapeLatch.await(1, SECONDS)).isTrue();
Expand Down Expand Up @@ -200,7 +200,7 @@ public Mono<Payload> requestResponse(Payload payload) {
() -> "",
serverTransport.clientTransport()
)
.retry(Retry.max(0))
.reconnectRetry(Retry.max(0))
.timeout(Duration.ofSeconds(10))
.doOnKeyReceived(() -> {
await(keyReceivedLatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public class PrometheusRSocketClientAutoConfiguration {
@Bean(destroyMethod = "pushAndCloseBlockingly")
PrometheusRSocketClient prometheusRSocketClient(PrometheusMeterRegistry meterRegistry, PrometheusRSocketClientProperties properties) {
return PrometheusRSocketClient.build(meterRegistry, properties.createClientTransport())
.retry(Retry.backoff(properties.getMaxRetries(), properties.getFirstBackoff())
.maxBackoff(properties.getMaxBackoff()))
.reconnectRetry(Retry.backoff(properties.getReconnectRetry().getMaxRetries(), properties.getReconnectRetry().getFirstBackoff())
.maxBackoff(properties.getReconnectRetry().getMaxBackoff()))
.retry(Retry.backoff(properties.getRetry().getMaxRetries(), properties.getRetry().getFirstBackoff())
.maxBackoff(properties.getRetry().getMaxBackoff()))
.timeout(properties.getTimeout())
.connectBlockingly();
}
Expand Down
Loading