diff --git a/dd-java-agent/instrumentation/resilience4j/build.gradle b/dd-java-agent/instrumentation/resilience4j/build.gradle new file mode 100644 index 00000000000..5e69c67bd78 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/build.gradle @@ -0,0 +1 @@ +apply from: "$rootDir/gradle/java.gradle" diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/build.gradle b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/build.gradle new file mode 100644 index 00000000000..e91b118b237 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/build.gradle @@ -0,0 +1,41 @@ +ext { + minJavaVersionForTests = JavaVersion.VERSION_17 +} + +apply from: "$rootDir/gradle/java.gradle" +apply plugin: 'idea' + +muzzle { + pass { + group = 'io.github.resilience4j' + module = 'resilience4j-all' + versions = '[2.0.0,)' + assertInverse = true + javaVersion = "17" + } +} + +idea { + module { + jdkName = '17' + } +} + +// Set all compile tasks to use JDK17 but let instrumentation code target 1.8 compatibility +project.tasks.withType(AbstractCompile).configureEach { + setJavaVersion(it, 17) +} +compileJava.configure { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 +} + +addTestSuiteForDir('latestDepTest', 'test') + +dependencies { + compileOnly group: 'io.github.resilience4j', name: 'resilience4j-all', version: '2.0.0' + + testImplementation group: 'io.github.resilience4j', name: 'resilience4j-all', version: '2.0.0' + latestDepTestImplementation group: 'io.github.resilience4j', name: 'resilience4j-all', version: '2.+' +} + diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerDecorator.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerDecorator.java new file mode 100644 index 00000000000..8be4ddbec9a --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerDecorator.java @@ -0,0 +1,34 @@ +package datadog.trace.instrumentation.resilience4j; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; + +public final class CircuitBreakerDecorator extends Resilience4jSpanDecorator { + public static final CircuitBreakerDecorator DECORATE = new CircuitBreakerDecorator(); + public static final String TAG_PREFIX = "resilience4j.circuit-breaker."; + public static final String TAG_METRICS_PREFIX = TAG_PREFIX + "metrics."; + + private CircuitBreakerDecorator() { + super(); + } + + @Override + public void decorate(AgentSpan span, CircuitBreaker data) { + span.setTag("resilience4j.circuit_breaker.name", data.getName()); + span.setTag("resilience4j.circuit_breaker.state", data.getState().toString()); + if (Config.get().isResilience4jTagMetricsEnabled()) { + CircuitBreaker.Metrics ms = data.getMetrics(); + span.setTag(TAG_METRICS_PREFIX + "failure_rate", ms.getFailureRate()); + span.setTag(TAG_METRICS_PREFIX + "slow_call_rate", ms.getSlowCallRate()); + span.setTag(TAG_METRICS_PREFIX + "slow_calls", ms.getNumberOfSlowCalls()); + span.setTag( + TAG_METRICS_PREFIX + "slow_successful_calls", ms.getNumberOfSlowSuccessfulCalls()); + span.setTag(TAG_METRICS_PREFIX + "slow_failed_calls", ms.getNumberOfSlowFailedCalls()); + span.setTag(TAG_METRICS_PREFIX + "buffered_calls", ms.getNumberOfBufferedCalls()); + span.setTag(TAG_METRICS_PREFIX + "failed_calls", ms.getNumberOfFailedCalls()); + span.setTag(TAG_METRICS_PREFIX + "not_permitted_calls", ms.getNumberOfNotPermittedCalls()); + span.setTag(TAG_METRICS_PREFIX + "successful_calls", ms.getNumberOfSuccessfulCalls()); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerInstrumentation.java new file mode 100644 index 00000000000..31dc9682255 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerInstrumentation.java @@ -0,0 +1,242 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.core.functions.CheckedConsumer; +import io.github.resilience4j.core.functions.CheckedFunction; +import io.github.resilience4j.core.functions.CheckedRunnable; +import io.github.resilience4j.core.functions.CheckedSupplier; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public final class CircuitBreakerInstrumentation extends Resilience4jInstrumentation { + + private static final String CIRCUIT_BREAKER_FQCN = + "io.github.resilience4j.circuitbreaker.CircuitBreaker"; + + private static final String THIS_CLASS = CircuitBreakerInstrumentation.class.getName(); + + public CircuitBreakerInstrumentation() { + super("resilience4j-circuitbreaker"); + } + + @Override + public String instrumentedType() { + return CIRCUIT_BREAKER_FQCN; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCheckedSupplier")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(CHECKED_SUPPLIER_FQCN))), + THIS_CLASS + "$CheckedSupplierAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCheckedFunction")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(CHECKED_FUNCTION_FQCN))), + THIS_CLASS + "$CheckedFunctionAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCheckedConsumer")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(CHECKED_CONSUMER_FQCN))), + THIS_CLASS + "$CheckedConsumerAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCompletionStage")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(SUPPLIER_FQCN))), + THIS_CLASS + "$CompletionStageAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateFuture")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(SUPPLIER_FQCN))), + THIS_CLASS + "$FutureAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateConsumer")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(CONSUMER_FQCN))), + THIS_CLASS + "$ConsumerAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCheckedRunnable")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(CHECKED_RUNNABLE_FQCN))), + THIS_CLASS + "$CheckedRunnableAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCallable")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(CALLABLE_FQCN))), + THIS_CLASS + "$CallableAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateRunnable")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(RUNNABLE_FQCN))), + THIS_CLASS + "$RunnableAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateSupplier")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(SUPPLIER_FQCN))), + THIS_CLASS + "$SupplierAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateFunction")) + .and(takesArgument(0, named(CIRCUIT_BREAKER_FQCN))) + .and(returns(named(FUNCTION_FQCN))), + THIS_CLASS + "$FunctionAdvice"); + } + + public static class SupplierAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) Supplier result) { + result = + new WrapperWithContext.SupplierWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class CallableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) Callable result) { + result = + new WrapperWithContext.CallableWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class RunnableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) Runnable result) { + result = + new WrapperWithContext.RunnableWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class FunctionAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) Function result) { + result = + new WrapperWithContext.FunctionWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class CheckedSupplierAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) CheckedSupplier result) { + result = + new WrapperWithContext.CheckedSupplierWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class CheckedFunctionAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) CheckedFunction result) { + result = + new WrapperWithContext.CheckedFunctionWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class CheckedConsumerAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) CheckedConsumer result) { + result = + new WrapperWithContext.CheckedConsumerWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class CheckedRunnableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) CheckedRunnable result) { + result = + new WrapperWithContext.CheckedRunnableWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class ConsumerAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) Consumer result) { + result = + new WrapperWithContext.ConsumerWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class CompletionStageAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) Supplier> result) { + result = + new WrapperWithContext.SupplierOfCompletionStageWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } + + public static class FutureAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) CircuitBreaker circuitBreaker, + @Advice.Return(readOnly = false) Supplier> result) { + result = + new WrapperWithContext.SupplierOfFutureWithContext<>( + result, CircuitBreakerDecorator.DECORATE, circuitBreaker); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCallableInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCallableInstrumentation.java new file mode 100644 index 00000000000..e4fe74943a3 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCallableInstrumentation.java @@ -0,0 +1,44 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import io.github.resilience4j.core.functions.CheckedSupplier; +import java.util.concurrent.Callable; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public class FallbackCallableInstrumentation extends Resilience4jInstrumentation { + public FallbackCallableInstrumentation() { + super("resilience4j-fallback"); + } + + @Override + public String instrumentedType() { + return "io.github.resilience4j.decorators.Decorators$DecorateCallable"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("withFallback")), + FallbackCallableInstrumentation.class.getName() + "$CallableAdvice"); + } + + public static class CallableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.FieldValue(value = "callable", readOnly = false) Callable callable) { + callable = + new WrapperWithContext.CallableWithContext<>( + callable, Resilience4jSpanDecorator.DECORATE, null); + } + + // 2.0.0+ + public static void muzzleCheck(CheckedSupplier cs) throws Throwable { + cs.get(); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCheckedSupplierInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCheckedSupplierInstrumentation.java new file mode 100644 index 00000000000..cd753791ee4 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCheckedSupplierInstrumentation.java @@ -0,0 +1,38 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import io.github.resilience4j.core.functions.CheckedSupplier; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public class FallbackCheckedSupplierInstrumentation extends Resilience4jInstrumentation { + public FallbackCheckedSupplierInstrumentation() { + super("resilience4j-fallback"); + } + + @Override + public String instrumentedType() { + return "io.github.resilience4j.decorators.Decorators$DecorateCheckedSupplier"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("withFallback")), + FallbackCheckedSupplierInstrumentation.class.getName() + "$CheckedSupplierAdvice"); + } + + public static class CheckedSupplierAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.FieldValue(value = "supplier", readOnly = false) CheckedSupplier supplier) { + supplier = + new WrapperWithContext.CheckedSupplierWithContext<>( + supplier, Resilience4jSpanDecorator.DECORATE, null); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCompletionStageInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCompletionStageInstrumentation.java new file mode 100644 index 00000000000..16b275825eb --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackCompletionStageInstrumentation.java @@ -0,0 +1,46 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import io.github.resilience4j.core.functions.CheckedSupplier; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public class FallbackCompletionStageInstrumentation extends Resilience4jInstrumentation { + public FallbackCompletionStageInstrumentation() { + super("resilience4j-fallback"); + } + + @Override + public String instrumentedType() { + return "io.github.resilience4j.decorators.Decorators$DecorateCompletionStage"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("withFallback")), + FallbackCompletionStageInstrumentation.class.getName() + "$CompletionStageAdvice"); + } + + public static class CompletionStageAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.FieldValue(value = "stageSupplier", readOnly = false) + Supplier> stageSupplier) { + stageSupplier = + new WrapperWithContext.SupplierOfCompletionStageWithContext<>( + stageSupplier, Resilience4jSpanDecorator.DECORATE, null); + } + + // 2.0.0+ + public static void muzzleCheck(CheckedSupplier cs) throws Throwable { + cs.get(); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackSupplierInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackSupplierInstrumentation.java new file mode 100644 index 00000000000..7066b92dc10 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackSupplierInstrumentation.java @@ -0,0 +1,44 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import io.github.resilience4j.core.functions.CheckedSupplier; +import java.util.function.Supplier; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public class FallbackSupplierInstrumentation extends Resilience4jInstrumentation { + public FallbackSupplierInstrumentation() { + super("resilience4j-fallback"); + } + + @Override + public String instrumentedType() { + return "io.github.resilience4j.decorators.Decorators$DecorateSupplier"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("withFallback")), + FallbackSupplierInstrumentation.class.getName() + "$SupplierAdvice"); + } + + public static class SupplierAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.FieldValue(value = "supplier", readOnly = false) Supplier supplier) { + supplier = + new WrapperWithContext.SupplierWithContext<>( + supplier, Resilience4jSpanDecorator.DECORATE, null); + } + + // 2.0.0+ + public static void muzzleCheck(CheckedSupplier cs) throws Throwable { + cs.get(); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jInstrumentation.java new file mode 100644 index 00000000000..6f6036224cf --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jInstrumentation.java @@ -0,0 +1,53 @@ +package datadog.trace.instrumentation.resilience4j; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.concurrent.Callable; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +public abstract class Resilience4jInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public static final String CHECKED_SUPPLIER_FQCN = + "io.github.resilience4j.core.functions.CheckedSupplier"; + public static final String CHECKED_RUNNABLE_FQCN = + "io.github.resilience4j.core.functions.CheckedRunnable"; + public static final String CHECKED_FUNCTION_FQCN = + "io.github.resilience4j.core.functions.CheckedFunction"; + public static final String CHECKED_CONSUMER_FQCN = + "io.github.resilience4j.core.functions.CheckedConsumer"; + public static final String SUPPLIER_FQCN = Supplier.class.getName(); + public static final String FUNCTION_FQCN = Function.class.getName(); + public static final String CONSUMER_FQCN = Consumer.class.getName(); + public static final String CALLABLE_FQCN = Callable.class.getName(); + public static final String RUNNABLE_FQCN = Runnable.class.getName(); + + public Resilience4jInstrumentation(String... additionalNames) { + super("resilience4j", additionalNames); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".WrapperWithContext", + packageName + ".WrapperWithContext$CallableWithContext", + packageName + ".WrapperWithContext$CheckedRunnableWithContext", + packageName + ".WrapperWithContext$RunnableWithContext", + packageName + ".WrapperWithContext$CheckedFunctionWithContext", + packageName + ".WrapperWithContext$ConsumerWithContext", + packageName + ".WrapperWithContext$CheckedSupplierWithContext", + packageName + ".WrapperWithContext$CheckedConsumerWithContext", + packageName + ".WrapperWithContext$FunctionWithContext", + packageName + ".WrapperWithContext$SupplierOfCompletionStageWithContext", + packageName + ".WrapperWithContext$SupplierWithContext", + packageName + ".WrapperWithContext$SupplierOfFutureWithContext", + packageName + ".WrapperWithContext$FinishOnGetFuture", + packageName + ".Resilience4jSpanDecorator", + packageName + ".Resilience4jSpan", + packageName + ".CircuitBreakerDecorator", + packageName + ".RetryDecorator", + }; + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jSpan.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jSpan.java new file mode 100644 index 00000000000..cef68e3f655 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jSpan.java @@ -0,0 +1,22 @@ +package datadog.trace.instrumentation.resilience4j; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; + +public class Resilience4jSpan { + public static final CharSequence SPAN_NAME = UTF8BytesString.create("resilience4j"); + public static final String INSTRUMENTATION_NAME = "resilience4j"; + + public static AgentSpan current() { + AgentSpan span = AgentTracer.activeSpan(); + if (span == null || !SPAN_NAME.equals(span.getOperationName())) { + return null; + } + return span; + } + + public static AgentSpan start() { + return AgentTracer.startSpan(INSTRUMENTATION_NAME, SPAN_NAME); + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jSpanDecorator.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jSpanDecorator.java new file mode 100644 index 00000000000..93a24368c7f --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jSpanDecorator.java @@ -0,0 +1,41 @@ +package datadog.trace.instrumentation.resilience4j; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; + +public class Resilience4jSpanDecorator extends BaseDecorator { + public static final Resilience4jSpanDecorator DECORATE = new Resilience4jSpanDecorator<>(); + + private static final CharSequence RESILIENCE4J = UTF8BytesString.create("resilience4j"); + + @Override + protected String[] instrumentationNames() { + return new String[] {"resilience4j"}; + } + + @Override + protected CharSequence spanType() { + return null; + } + + @Override + protected CharSequence component() { + return RESILIENCE4J; + } + + @Override + public AgentSpan afterStart(AgentSpan span) { + super.afterStart(span); + span.setSpanName(RESILIENCE4J); + span.setTag(Tags.SPAN_KIND, Tags.SPAN_KIND_INTERNAL); + if (Config.get().isResilience4jMeasuredEnabled()) { + span.setMeasured(true); + } + return span; + } + + public void decorate(AgentSpan span, T data) {} +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryDecorator.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryDecorator.java new file mode 100644 index 00000000000..be4da0523e1 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryDecorator.java @@ -0,0 +1,42 @@ +package datadog.trace.instrumentation.resilience4j; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.github.resilience4j.retry.Retry; + +public final class RetryDecorator extends Resilience4jSpanDecorator { + public static final RetryDecorator DECORATE = new RetryDecorator(); + public static final String TAG_PREFIX = "resilience4j.retry."; + public static final String TAG_METRICS_PREFIX = TAG_PREFIX + "metrics."; + + private RetryDecorator() { + super(); + } + + @Override + protected String[] instrumentationNames() { + return new String[] {"resilience4j.retry"}; + } + + @Override + public void decorate(AgentSpan span, Retry data) { + span.setTag(TAG_PREFIX + "name", data.getName()); + span.setTag(TAG_PREFIX + "max_attempts", data.getRetryConfig().getMaxAttempts()); + span.setTag( + TAG_PREFIX + "fail_after_max_attempts", data.getRetryConfig().isFailAfterMaxAttempts()); + if (Config.get().isResilience4jTagMetricsEnabled()) { + Retry.Metrics ms = data.getMetrics(); + span.setTag( + TAG_METRICS_PREFIX + "success_without_retry", + ms.getNumberOfSuccessfulCallsWithoutRetryAttempt()); + span.setTag( + TAG_METRICS_PREFIX + "failed_without_retry", + ms.getNumberOfFailedCallsWithoutRetryAttempt()); + span.setTag( + TAG_METRICS_PREFIX + "success_with_retry", + ms.getNumberOfSuccessfulCallsWithRetryAttempt()); + span.setTag( + TAG_METRICS_PREFIX + "failed_with_retry", ms.getNumberOfFailedCallsWithRetryAttempt()); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryInstrumentation.java new file mode 100644 index 00000000000..6a777dffaf9 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryInstrumentation.java @@ -0,0 +1,174 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import io.github.resilience4j.core.functions.CheckedFunction; +import io.github.resilience4j.core.functions.CheckedRunnable; +import io.github.resilience4j.core.functions.CheckedSupplier; +import io.github.resilience4j.retry.Retry; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.function.Supplier; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public final class RetryInstrumentation extends Resilience4jInstrumentation { + + private static final String RETRY_FQCN = "io.github.resilience4j.retry.Retry"; + private static final String THIS_CLASS = RetryInstrumentation.class.getName(); + + public RetryInstrumentation() { + super("resilience4j-retry"); + } + + @Override + public String instrumentedType() { + return RETRY_FQCN; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCompletionStage")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(SUPPLIER_FQCN))), + THIS_CLASS + "$CompletionStageAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCheckedSupplier")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(CHECKED_SUPPLIER_FQCN))), + THIS_CLASS + "$CheckedSupplierAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCheckedRunnable")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(CHECKED_RUNNABLE_FQCN))), + THIS_CLASS + "$CheckedRunnableAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCallable")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(CALLABLE_FQCN))), + THIS_CLASS + "$CallableAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateSupplier")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(SUPPLIER_FQCN))), + THIS_CLASS + "$SupplierAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateFunction")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(FUNCTION_FQCN))), + THIS_CLASS + "$FunctionAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateCheckedFunction")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(CHECKED_FUNCTION_FQCN))), + THIS_CLASS + "$CheckedFunctionAdvice"); + transformer.applyAdvice( + isMethod() + .and(isStatic()) + .and(named("decorateRunnable")) + .and(takesArgument(0, named(RETRY_FQCN))) + .and(returns(named(RUNNABLE_FQCN))), + THIS_CLASS + "$RunnableAdvice"); + } + + public static class SupplierAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, + @Advice.Return(readOnly = false) Supplier result) { + result = new WrapperWithContext.SupplierWithContext<>(result, RetryDecorator.DECORATE, retry); + } + } + + public static class CallableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, + @Advice.Return(readOnly = false) Callable result) { + result = new WrapperWithContext.CallableWithContext<>(result, RetryDecorator.DECORATE, retry); + } + } + + public static class FunctionAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, + @Advice.Return(readOnly = false) Function result) { + result = new WrapperWithContext.FunctionWithContext<>(result, RetryDecorator.DECORATE, retry); + } + } + + public static class CheckedFunctionAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, + @Advice.Return(readOnly = false) CheckedFunction result) { + result = + new WrapperWithContext.CheckedFunctionWithContext<>( + result, RetryDecorator.DECORATE, retry); + } + } + + public static class CheckedSupplierAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, + @Advice.Return(readOnly = false) CheckedSupplier result) { + result = + new WrapperWithContext.CheckedSupplierWithContext<>( + result, RetryDecorator.DECORATE, retry); + } + } + + public static class CheckedRunnableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, + @Advice.Return(readOnly = false) CheckedRunnable result) { + result = + new WrapperWithContext.CheckedRunnableWithContext<>( + result, RetryDecorator.DECORATE, retry); + } + } + + public static class CompletionStageAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, + @Advice.Return(readOnly = false) Supplier> result) { + result = + new WrapperWithContext.SupplierOfCompletionStageWithContext<>( + result, RetryDecorator.DECORATE, retry); + } + } + + public static class RunnableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void afterExecute( + @Advice.Argument(value = 0) Retry retry, @Advice.Return(readOnly = false) Runnable result) { + result = new WrapperWithContext.RunnableWithContext<>(result, RetryDecorator.DECORATE, retry); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/WrapperWithContext.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/WrapperWithContext.java new file mode 100644 index 00000000000..a791f605bbb --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/main/java/datadog/trace/instrumentation/resilience4j/WrapperWithContext.java @@ -0,0 +1,329 @@ +package datadog.trace.instrumentation.resilience4j; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import io.github.resilience4j.core.functions.CheckedConsumer; +import io.github.resilience4j.core.functions.CheckedFunction; +import io.github.resilience4j.core.functions.CheckedRunnable; +import io.github.resilience4j.core.functions.CheckedSupplier; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +public class WrapperWithContext { + + public static final class CheckedConsumerWithContext extends WrapperWithContext + implements CheckedConsumer { + private final CheckedConsumer delegate; + + public CheckedConsumerWithContext( + CheckedConsumer delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public void accept(I arg) throws Throwable { + try (AgentScope ignore = activateScope()) { + delegate.accept(arg); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class ConsumerWithContext extends WrapperWithContext + implements Consumer { + private final Consumer delegate; + + public ConsumerWithContext( + Consumer delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public void accept(I arg) { + try (AgentScope ignore = activateScope()) { + delegate.accept(arg); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class CheckedFunctionWithContext extends WrapperWithContext + implements CheckedFunction { + private final CheckedFunction delegate; + + public CheckedFunctionWithContext( + CheckedFunction delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public O apply(I arg) throws Throwable { + try (AgentScope ignore = activateScope()) { + return delegate.apply(arg); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class SupplierWithContext extends WrapperWithContext + implements Supplier { + private final Supplier delegate; + + public SupplierWithContext( + Supplier delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public O get() { + try (AgentScope ignore = activateScope()) { + return delegate.get(); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class CallableWithContext extends WrapperWithContext + implements Callable { + private final Callable delegate; + + public CallableWithContext( + Callable delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public O call() throws Exception { + try (AgentScope ignore = activateScope()) { + return delegate.call(); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class FunctionWithContext extends WrapperWithContext + implements Function { + private final Function delegate; + + public FunctionWithContext( + Function delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public O apply(I arg) { + try (AgentScope ignore = activateScope()) { + return delegate.apply(arg); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class CheckedSupplierWithContext extends WrapperWithContext + implements CheckedSupplier { + private final CheckedSupplier delegate; + + public CheckedSupplierWithContext( + CheckedSupplier delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public O get() throws Throwable { + try (AgentScope ignore = activateScope()) { + return delegate.get(); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class CheckedRunnableWithContext extends WrapperWithContext + implements CheckedRunnable { + private final CheckedRunnable delegate; + + public CheckedRunnableWithContext( + CheckedRunnable delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public void run() throws Throwable { + try (AgentScope ignore = activateScope()) { + delegate.run(); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class RunnableWithContext extends WrapperWithContext + implements Runnable { + private final Runnable delegate; + + public RunnableWithContext( + Runnable delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public void run() { + try (AgentScope ignore = activateScope()) { + delegate.run(); + } finally { + finishSpanIfNeeded(); + } + } + } + + public static final class SupplierOfCompletionStageWithContext extends WrapperWithContext + implements Supplier> { + private final Supplier> delegate; + + public SupplierOfCompletionStageWithContext( + Supplier> delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public CompletionStage get() { + try (AgentScope ignore = activateScope()) { + return delegate + .get() + .whenComplete( + (v, e) -> { + finishSpanIfNeeded(); + }); + } + } + } + + public static final class SupplierOfFutureWithContext extends WrapperWithContext + implements Supplier> { + private final Supplier> delegate; + + public SupplierOfFutureWithContext( + Supplier> delegate, Resilience4jSpanDecorator spanDecorator, T data) { + super(spanDecorator, data); + this.delegate = delegate; + } + + @Override + public Future get() { + try (AgentScope ignore = activateScope()) { + Future future = delegate.get(); + if (future instanceof CompletableFuture) { + ((CompletableFuture) future) + .whenComplete( + (v, e) -> { + finishSpanIfNeeded(); + }); + return future; + } + return new FinishOnGetFuture<>(future, this); + } + } + } + + private static final class FinishOnGetFuture implements Future { + private final Future delegate; + private final WrapperWithContext context; + + FinishOnGetFuture(Future delegate, WrapperWithContext context) { + this.delegate = delegate; + this.context = context; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + try { + return delegate.cancel(mayInterruptIfRunning); + } finally { + context.finishSpanIfNeeded(); + } + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + try { + return delegate.get(); + } finally { + context.finishSpanIfNeeded(); + } + } + + @Override + public V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + try { + return delegate.get(timeout, unit); + } finally { + context.finishSpanIfNeeded(); + } + } + } + + private final Resilience4jSpanDecorator spanDecorator; + private final T data; + private AgentSpan span; + + protected WrapperWithContext(Resilience4jSpanDecorator spanDecorator, T data) { + this.spanDecorator = spanDecorator; + this.data = data; + } + + public AgentScope activateScope() { + AgentSpan current = Resilience4jSpan.current(); + AgentSpan owned = current == null ? Resilience4jSpan.start() : null; + if (owned != null) { + current = owned; + spanDecorator.afterStart(owned); + this.span = owned; + } + spanDecorator.decorate(current, data); + return AgentTracer.activateSpan(current); + } + + public void finishSpanIfNeeded() { + if (span != null) { + spanDecorator.beforeFinish(span); + span.finish(); + span = null; + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/CircuitBreakerTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/CircuitBreakerTest.groovy new file mode 100644 index 00000000000..c3aa8be4bc4 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/CircuitBreakerTest.groovy @@ -0,0 +1,263 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.api.config.TraceInstrumentationConfig +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import datadog.trace.bootstrap.instrumentation.api.Tags +import io.github.resilience4j.circuitbreaker.CircuitBreaker +import io.github.resilience4j.core.functions.CheckedConsumer +import io.github.resilience4j.core.functions.CheckedRunnable +import io.github.resilience4j.core.functions.CheckedSupplier +import io.github.resilience4j.core.functions.CheckedFunction + +import java.util.concurrent.Callable +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.function.Consumer +import java.util.function.Function +import java.util.function.Supplier + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class CircuitBreakerTest extends InstrumentationSpecification { + static singleThreadExecutor = Executors.newSingleThreadExecutor() + + def "decorate span with circuit-breaker"() { + setup: + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_MEASURED_ENABLED, measuredEnabled.toString()) + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_TAG_METRICS_ENABLED, tagMetricsEnabled.toString()) + + def ms = Mock(CircuitBreaker.Metrics) + def cb = Mock(CircuitBreaker) + cb.getName() >> "cb1" + cb.getState() >> CircuitBreaker.State.CLOSED + cb.tryAcquirePermission() >> true + cb.getMetrics() >> ms + ms.getFailureRate() >> 0.1f + ms.getSlowCallRate() >> 0.2f + ms.getNumberOfBufferedCalls() >> 12 + ms.getNumberOfFailedCalls() >> 13 + ms.getNumberOfNotPermittedCalls() >> 2 + ms.getNumberOfSlowCalls() >> 23 + ms.getNumberOfSlowFailedCalls() >> 3 + ms.getNumberOfSlowSuccessfulCalls() >> 33 + ms.getNumberOfSuccessfulCalls() >> 50 + + when: + Supplier supplier = CircuitBreaker.decorateSupplier(cb) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){supplier.get()} == "foobar" + + then: + assertTraces(1) { + trace(3) { + sortSpansByStart() + span(0) { + operationName "parent" + errored false + } + span(1) { + operationName "resilience4j" + childOf(span(0)) + errored false + measured(measuredEnabled) + tags { + "$Tags.COMPONENT" "resilience4j" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_INTERNAL + "resilience4j.circuit_breaker.name" "cb1" + "resilience4j.circuit_breaker.state" "CLOSED" + if (tagMetricsEnabled) { + "resilience4j.circuit-breaker.metrics.failure_rate" 0.1f + "resilience4j.circuit-breaker.metrics.slow_call_rate" 0.2f + "resilience4j.circuit-breaker.metrics.buffered_calls" 12 + "resilience4j.circuit-breaker.metrics.failed_calls" 13 + "resilience4j.circuit-breaker.metrics.not_permitted_calls" 2 + "resilience4j.circuit-breaker.metrics.slow_calls" 23 + "resilience4j.circuit-breaker.metrics.slow_failed_calls" 3 + "resilience4j.circuit-breaker.metrics.slow_successful_calls" 33 + "resilience4j.circuit-breaker.metrics.successful_calls" 50 + } + defaultTags() + } + } + span(2) { + operationName "serviceCall" + childOf(span(1)) + errored false + } + } + } + + where: + measuredEnabled | tagMetricsEnabled + true | true + false | false + true | false + false | true + } + + def "decorateCheckedSupplier"() { + when: + CheckedSupplier supplier = CircuitBreaker.decorateCheckedSupplier(CircuitBreaker.ofDefaults("cb")) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){supplier.get()} == "foobar" + and: + assertExpectedTrace() + } + + def "decorateCompletionStage"() { + when: + Supplier> supplier = CircuitBreaker.decorateCompletionStage(CircuitBreaker.ofDefaults("cb"), { + CompletableFuture.supplyAsync({ + serviceCall("foobar") + }, singleThreadExecutor) + }) + def future = runUnderTrace("parent"){supplier.get()}.toCompletableFuture() + + then: + future.get() == "foobar" + and: + assertExpectedTrace() + } + + def "decorateCheckedRunnable"() { + when: + CheckedRunnable runnable = CircuitBreaker.decorateCheckedRunnable(CircuitBreaker.ofDefaults("cb")) { serviceCall("foobar") } + + then: + runUnderTrace("parent") { + runnable.run() + "a" + } + and: + assertExpectedTrace() + } + + def "decorateCallable"() { + when: + Callable callable = CircuitBreaker.decorateCallable(CircuitBreaker.ofDefaults("cb")) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){callable.call()} == "foobar" + and: + assertExpectedTrace() + } + + def "decorateSupplier"() { + when: + Supplier supplier = CircuitBreaker.decorateSupplier(CircuitBreaker.ofDefaults("cb")) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){supplier.get()} == "foobar" + and: + assertExpectedTrace() + } + + def "decorateConsumer"() { + + when: + Consumer consumer = CircuitBreaker.decorateConsumer(CircuitBreaker.ofDefaults("cb")) { s -> serviceCall(s) } + + then: + runUnderTrace("parent") { + consumer.accept("test") + "a" + } + and: + assertExpectedTrace() + } + + def "decorateCheckedConsumer"() { + + when: + CheckedConsumer consumer = CircuitBreaker.decorateCheckedConsumer(CircuitBreaker.ofDefaults("cb")) { s -> serviceCall(s) } + + then: + runUnderTrace("parent") { + consumer.accept("test") + "a" + } + and: + assertExpectedTrace() + } + + def "decorateRunnable"() { + when: + Runnable runnable = CircuitBreaker.decorateRunnable(CircuitBreaker.ofDefaults("cb")) { serviceCall("foobar") } + + then: + runUnderTrace("parent") { + runnable.run() + "a" + } + and: + assertExpectedTrace() + } + + def "decorateFunction"() { + when: + Function function = CircuitBreaker.decorateFunction(CircuitBreaker.ofDefaults("cb")) { v -> serviceCall("foobar-$v") } + + then: + runUnderTrace("parent"){function.apply("test")} == "foobar-test" + and: + assertExpectedTrace() + } + + def "decorateCheckedFunction"() { + when: + CheckedFunction function = CircuitBreaker.decorateCheckedFunction(CircuitBreaker.ofDefaults("cb")) { v -> serviceCall("foobar-$v") } + + then: + runUnderTrace("parent") { function.apply("test") } == "foobar-test" + and: + assertExpectedTrace() + } + + def "decorateFuture"() { + setup: + def executor = singleThreadExecutor + when: + Supplier> supplier = CircuitBreaker.decorateFuture(CircuitBreaker.ofDefaults("cb"), { + CompletableFuture.supplyAsync({ + serviceCall("foobar") + }, executor) + }) + + then: + def future = runUnderTrace("parent"){supplier.get()} + future.get() == "foobar" + and: + assertExpectedTrace() + } + + private void assertExpectedTrace() { + assertTraces(1) { + trace(3) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCall" + childOf span(1) + errored false + } + } + } + } + + def T serviceCall(T value) { + AgentTracer.startSpan("test", "serviceCall").finish() + value + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/FallbackTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/FallbackTest.groovy new file mode 100644 index 00000000000..ca8a25c5cc8 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/FallbackTest.groovy @@ -0,0 +1,218 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import io.github.resilience4j.core.functions.CheckedBiFunction +import io.github.resilience4j.core.functions.CheckedFunction +import io.github.resilience4j.decorators.Decorators +import io.github.resilience4j.decorators.Decorators.DecorateSupplier +import io.github.resilience4j.decorators.Decorators.DecorateCallable +import io.github.resilience4j.decorators.Decorators.DecorateCheckedSupplier +import io.github.resilience4j.core.functions.CheckedSupplier +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.Executors +import java.util.function.BiFunction +import java.util.function.Function +import java.util.function.Predicate +import java.util.function.Supplier +import java.util.function.UnaryOperator + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class FallbackTest extends InstrumentationSpecification { + static singleThreadExecutor = Executors.newSingleThreadExecutor() + + def "ofSupplier"(DecorateSupplier decorateSupplier) { + setup: + def supplier = decorateSupplier.decorate() + + when: + def result = runUnderTrace("parent") { supplier.get() } + + then: + result == "fallbackResult" + and: + assertExpectedTrace() + + where: + decorateSupplier << [ + Decorators.ofSupplier{ + serviceCallErr(new IllegalStateException("test")) + }.withFallback({ t -> serviceCall("fallbackResult", "fallbackCall") } as Function) + , + Decorators.ofSupplier{ + serviceCall("badResult", "serviceCall") + }.withFallback({ it == "badResult" } as Predicate, { serviceCall("fallbackResult", "fallbackCall") } as UnaryOperator) + , + Decorators.ofSupplier{ + serviceCallErr(new IllegalStateException("test")) + } + .withFallback({ v, t -> serviceCall("fallbackResult", "fallbackCall") } as BiFunction) + , + Decorators.ofSupplier{ + serviceCallErr(new IllegalStateException("test")) + } + .withFallback(List.of(IllegalStateException), {t -> serviceCall("fallbackResult", "fallbackCall") } as Function), + ] + } + + def "ofCheckedSupplier"(DecorateCheckedSupplier decorateCheckedSupplier) { + setup: + CheckedSupplier supplier = decorateCheckedSupplier.decorate() + + when: + def result = runUnderTrace("parent") { supplier.get() } + + then: + result == "fallbackResult" + and: + assertExpectedTrace() + + where: + decorateCheckedSupplier << [ + Decorators.ofCheckedSupplier{ + serviceCallErr(new IllegalStateException("test")) + }.withFallback({ t -> serviceCall("fallbackResult", "fallbackCall") } as CheckedFunction) + , + Decorators.ofCheckedSupplier{ + serviceCall("badResult", "serviceCall") + }.withFallback({ it == "badResult" } as Predicate, { serviceCall("fallbackResult", "fallbackCall") } as CheckedFunction) + , + Decorators.ofCheckedSupplier{ + serviceCallErr(new IllegalStateException("test")) + } + .withFallback({ v, t -> serviceCall("fallbackResult", "fallbackCall") } as CheckedBiFunction) + , + Decorators.ofCheckedSupplier{ + serviceCallErr(new IllegalStateException("test")) + } + .withFallback(List.of(IllegalStateException), { t -> serviceCall("fallbackResult", "fallbackCall") } as CheckedFunction), + ] + } + + def "ofCallable"(DecorateCallable decorateCallable) { + setup: + def callable = decorateCallable.decorate() + + when: + def result = runUnderTrace("parent") { callable.call() } + + then: + result == "fallbackResult" + and: + assertExpectedTrace() + + where: + decorateCallable << [ + Decorators.ofCallable{ v -> + serviceCallErr(new IllegalStateException("test")) + }.withFallback({ t -> serviceCall("fallbackResult", "fallbackCall") } as Function) + , + Decorators.ofCallable{ v -> + serviceCall("badResult", "serviceCall") + }.withFallback({ it == "badResult" } as Predicate, { serviceCall("fallbackResult", "fallbackCall") } as UnaryOperator) + , + Decorators.ofCallable{ v -> + serviceCallErr(new IllegalStateException("test")) + } + .withFallback({ v, t -> serviceCall("fallbackResult", "fallbackCall") } as BiFunction) + , + Decorators.ofCallable{ v -> + serviceCallErr(new IllegalStateException("test")) + } + .withFallback(List.of(IllegalStateException), { t -> serviceCall("fallbackResult", "fallbackCall") } as Function), + ] + } + + def "ofCompletionStage"(Supplier> supplier) { + when: + def future = runUnderTrace("parent") { supplier.get().toCompletableFuture() } + + then: + future.get() == "fallbackResult" + + then: + assertExpectedTrace() + + where: + supplier << [ + Decorators + .ofCompletionStage { + CompletableFuture.supplyAsync({ + serviceCallErr(new IllegalStateException("test")) + }, singleThreadExecutor) + } + .withFallback({ Throwable t -> + serviceCall("fallbackResult", "fallbackCall") + } as Function) + .decorate(), + Decorators + .ofCompletionStage { + CompletableFuture.supplyAsync({ + serviceCall("badResult", "serviceCall") + }, singleThreadExecutor) + } + .withFallback({ it == "badResult" } as Predicate, { serviceCall("fallbackResult", "fallbackCall") } as UnaryOperator) + .decorate(), + Decorators + .ofCompletionStage { + CompletableFuture.supplyAsync({ + serviceCallErr(new IllegalStateException("test")) + }, singleThreadExecutor) + } + .withFallback({ v, t -> serviceCall("fallbackResult", "fallbackCall") } as BiFunction) + .decorate(), + Decorators + .ofCompletionStage { + CompletableFuture.supplyAsync({ + serviceCallErr(new IllegalStateException("test")) + }, singleThreadExecutor) + } + .withFallback(List.of(IllegalStateException), { t -> serviceCall("fallbackResult", "fallbackCall") } as Function) + .decorate(), + ] + } + + private void assertExpectedTrace() { + assertTraces(1) { + trace(4) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCall" + childOf span(1) + errored false + } + span(3) { + operationName "fallbackCall" + childOf span(1) + errored false + } + } + } + } + + def T serviceCall(T value, String name) { + AgentTracer.startSpan("test", name).finish() + value + } + + def serviceCallErr(IllegalStateException e) { + def span = AgentTracer.startSpan("test", "serviceCall") + def scope = AgentTracer.activateSpan(span) + try { + throw e + } finally { + scope.close() + span.finish() + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/RetryTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/RetryTest.groovy new file mode 100644 index 00000000000..888fdb7d94d --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/RetryTest.groovy @@ -0,0 +1,310 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.api.config.TraceInstrumentationConfig +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import datadog.trace.bootstrap.instrumentation.api.Tags +import io.github.resilience4j.core.functions.CheckedFunction +import io.github.resilience4j.core.functions.CheckedRunnable +import io.github.resilience4j.core.functions.CheckedSupplier +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import java.util.concurrent.Callable +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.ExecutionException +import java.util.concurrent.Executors +import java.util.function.Function +import java.util.function.Supplier + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class RetryTest extends InstrumentationSpecification { + static singleThreadExecutor = Executors.newSingleThreadExecutor() + + def "decorate span with retry"() { + setup: + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_MEASURED_ENABLED, measuredEnabled.toString()) + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_TAG_METRICS_ENABLED, tagMetricsEnabled.toString()) + + def ms = Mock(Retry.Metrics) + def rc = Mock(RetryConfig) + def rt = Mock(Retry) + def cx = Mock(Retry.Context) + rt.getName() >> "rt1" + rt.getRetryConfig() >> rc + rt.getMetrics() >> ms + rt.context() >> cx + rc.getMaxAttempts() >> 23 + rc.isFailAfterMaxAttempts() >> true + ms.getNumberOfFailedCallsWithoutRetryAttempt() >> 1 + ms.getNumberOfFailedCallsWithRetryAttempt() >> 2 + ms.getNumberOfSuccessfulCallsWithoutRetryAttempt() >> 3 + ms.getNumberOfSuccessfulCallsWithRetryAttempt() >> 4 + + when: + Supplier supplier = Retry.decorateSupplier(rt) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){supplier.get()} == "foobar" + + then: + assertTraces(1) { + trace(3) { + sortSpansByStart() + span(0) { + operationName "parent" + errored false + } + span(1) { + operationName "resilience4j" + childOf(span(0)) + errored false + measured(measuredEnabled) + tags { + "$Tags.COMPONENT" "resilience4j" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_INTERNAL + "resilience4j.retry.name" "rt1" + "resilience4j.retry.max_attempts" 23 + "resilience4j.retry.fail_after_max_attempts" true + if (tagMetricsEnabled) { + "resilience4j.retry.metrics.failed_without_retry" 1 + "resilience4j.retry.metrics.failed_with_retry" 2 + "resilience4j.retry.metrics.success_without_retry" 3 + "resilience4j.retry.metrics.success_with_retry" 4 + } + defaultTags() + } + } + + span(2) { + operationName "serviceCall" + childOf(span(1)) + errored false + } + } + } + + where: + measuredEnabled | tagMetricsEnabled + true | true + false | false + true | false + false | true + } + + def "decorateCompletionStage"() { + setup: + when: + def scheduler = Executors.newSingleThreadScheduledExecutor() + Supplier> supplier = Retry.decorateCompletionStage( + Retry.ofDefaults("rt"), scheduler, { + CompletableFuture.supplyAsync({ + serviceCall("foobar") + }, singleThreadExecutor) + } + ) + + then: + runUnderTrace("parent"){supplier.get().toCompletableFuture()}.get() == "foobar" + and: + assertExpectedTrace() + } + + def "decorateCheckedSupplier"() { + when: + CheckedSupplier supplier = Retry.decorateCheckedSupplier(Retry.ofDefaults("rt")) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){supplier.get()} == "foobar" + and: + assertExpectedTrace() + } + + def "decorateCheckedRunnable"() { + when: + CheckedRunnable runnable = Retry.decorateCheckedRunnable(Retry.ofDefaults("rt")) { serviceCall("foobar") } + + then: + runUnderTrace("parent") { + runnable.run() + "a" + } + and: + assertExpectedTrace() + } + + def "decorateCheckedFunction"() { + when: + CheckedFunction function = Retry.decorateCheckedFunction(Retry.ofDefaults("rt")) { v -> serviceCall("foobar-$v") } + + then: + runUnderTrace("parent") { function.apply("test") } == "foobar-test" + and: + assertExpectedTrace() + } + + def "decorateSupplier"() { + when: + Supplier supplier = Retry.decorateSupplier(Retry.ofDefaults("rt")) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){supplier.get()} == "foobar" + and: + assertExpectedTrace() + } + + def "decorateCallable"() { + when: + Callable callable = Retry.decorateCallable(Retry.ofDefaults("rt")) { serviceCall("foobar") } + + then: + runUnderTrace("parent"){callable.call()} == "foobar" + and: + assertExpectedTrace() + } + + def "decorateRunnable"() { + when: + Runnable runnable = Retry.decorateRunnable(Retry.ofDefaults("rt")) { serviceCall("foobar") } + + then: + runUnderTrace("parent") { + runnable.run() + "a" + } + and: + assertExpectedTrace() + } + + def "decorateFunction"() { + when: + Function function = Retry.decorateFunction(Retry.ofDefaults("rt")) { v -> serviceCall("foobar-$v") } + + then: + runUnderTrace("parent"){function.apply("test")} == "foobar-test" + and: + assertExpectedTrace() + } + + def "decorateSupplier retry twice on error -- second call scoped by the r4j span"() { + when: + Supplier supplier = Retry.decorateSupplier( + Retry.of("rt", RetryConfig.custom().maxAttempts(2).build()) + ) { serviceCallErr(new IllegalStateException("error")) } + runUnderTrace("parent") { supplier.get() } + then: + thrown(IllegalStateException) + and: + assertTraces(1) { + trace(4) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored true // b/o unhandled exception + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCall" + childOf span(1) + errored false + } + // second attempt span under the retry span + span(3) { + operationName "serviceCall" + childOf span(1) + errored false + } + } + } + } + + def "decorateCompletionStage retry twice on error -- second call scoped by the r4j span"() { + setup: + def scheduler = Executors.newSingleThreadScheduledExecutor() + Supplier> supplier = Retry.decorateCompletionStage( + Retry.of("rt", RetryConfig.custom().maxAttempts(2).build()), scheduler, { + CompletableFuture.supplyAsync({ + serviceCallErr(new IllegalStateException("error")) + }, singleThreadExecutor) + } + ) + + when: + def future = runUnderTrace("parent") { supplier.get().toCompletableFuture() } + future.get() + + then: + def ee = thrown(ExecutionException) + ee.cause instanceof IllegalStateException + and: + assertTraces(1) { + trace(4) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCall" + childOf span(1) + errored false + } + // second attempt span under the retry span + span(3) { + operationName "serviceCall" + childOf span(1) + errored false + } + } + } + } + + private void assertExpectedTrace() { + assertTraces(1) { + trace(3) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCall" + childOf span(1) + errored false + } + } + } + } + + def T serviceCall(T value) { + AgentTracer.startSpan("test", "serviceCall").finish() + value + } + + void serviceCallErr(IllegalStateException e) { + def span = AgentTracer.startSpan("test", "serviceCall") + def scope = AgentTracer.activateSpan(span) + try { + throw e + } finally { + scope.close() + span.finish() + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/StackedDecoratorsTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/StackedDecoratorsTest.groovy new file mode 100644 index 00000000000..513a2635d54 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-2/src/test/groovy/StackedDecoratorsTest.groovy @@ -0,0 +1,79 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import io.github.resilience4j.bulkhead.Bulkhead +import io.github.resilience4j.circuitbreaker.CircuitBreaker +import io.github.resilience4j.decorators.Decorators +import io.github.resilience4j.ratelimiter.RateLimiter +import io.github.resilience4j.retry.Retry +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.Executors +import java.util.function.Function +import java.util.function.Supplier +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class StackedDecoratorsTest extends InstrumentationSpecification { + + def "happy path sync test"() { + when: + Supplier supplier = Decorators + .ofSupplier{serviceCall("foobar", "serviceCall")} + .withCircuitBreaker(CircuitBreaker.ofDefaults("A")) + .withRateLimiter(RateLimiter.ofDefaults("L")) // not instrumented, doesn't break the scope + .withRetry(Retry.ofDefaults("R")) + .withBulkhead(Bulkhead.ofDefaults("B")) // not instrumented, doesn't break the scope + .withFallback({ t -> serviceCall("fallbackResult", "fallbackCall") } as Function) + .decorate() + + then: + runUnderTrace("parent"){supplier.get()} == "foobar" + and: + assertExpectedTrace() + } + + def "happy path async test"() { + when: + Supplier> supplier = Decorators + .ofCompletionStage { + CompletableFuture.supplyAsync({ + serviceCall("foobar", "serviceCall") + }, Executors.newSingleThreadExecutor()) + } + .withCircuitBreaker(CircuitBreaker.ofDefaults("A")) + .withRetry(Retry.ofDefaults("R"), Executors.newSingleThreadScheduledExecutor()) + .withFallback({ t -> serviceCall("fallbackResult", "fallbackCall") } as Function) + .decorate() + + then: + runUnderTrace("parent"){supplier.get().toCompletableFuture()}.get() == "foobar" + and: + assertExpectedTrace() + } + + private void assertExpectedTrace() { + assertTraces(1) { + trace(3) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCall" + childOf span(1) + errored false + } + } + } + } + + def T serviceCall(T value, String name) { AgentTracer.startSpan("test", name).finish() + value + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/build.gradle b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/build.gradle new file mode 100644 index 00000000000..832b52a4776 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/build.gradle @@ -0,0 +1,50 @@ +ext { + minJavaVersionForTests = JavaVersion.VERSION_17 +} + +apply from: "$rootDir/gradle/java.gradle" +apply plugin: 'idea' + +muzzle { + pass { + group = 'io.github.resilience4j' + module = 'resilience4j-reactor' + versions = '[2.0.0,)' + assertInverse = true + javaVersion = "17" + } +} + +idea { + module { + jdkName = '17' + } +} + +// Set all compile tasks to use JDK17 but let instrumentation code target 1.8 compatibility +project.tasks.withType(AbstractCompile).configureEach { + setJavaVersion(it, 17) +} +compileJava.configure { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 +} + +addTestSuiteForDir('latestDepTest', 'test') + +dependencies { + implementation project(':dd-java-agent:instrumentation:resilience4j:resilience4j-2') + + compileOnly group: 'io.github.resilience4j', name: 'resilience4j-all', version: '2.0.0' + testImplementation group: 'io.github.resilience4j', name: 'resilience4j-all', version: '2.0.0' + latestDepTestImplementation group: 'io.github.resilience4j', name: 'resilience4j-all', version: '2.+' + + compileOnly group: 'io.github.resilience4j', name: 'resilience4j-reactor', version: '2.0.0' + testImplementation group: 'io.github.resilience4j', name: 'resilience4j-reactor', version: '2.0.0' + latestDepTestImplementation group: 'io.github.resilience4j', name: 'resilience4j-reactor', version: '2.+' + + // Include other instruments that we rely on and that must not conflict with each other + testImplementation project(':dd-java-agent:instrumentation:reactor-core-3.1') + testImplementation project(':dd-java-agent:instrumentation:reactive-streams') + testImplementation project(':dd-java-agent:instrumentation:java-concurrent') +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java new file mode 100644 index 00000000000..08b6a2f0013 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import net.bytebuddy.asm.Advice; +import org.reactivestreams.Publisher; + +@AutoService(InstrumenterModule.class) +public class CircuitBreakerOperatorInstrumentation extends Resilience4jReactorInstrumentation { + + public CircuitBreakerOperatorInstrumentation() { + super("resilience4j-circuitbreaker"); + } + + @Override + public String instrumentedType() { + return "io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("apply")) + .and(takesArgument(0, named("org.reactivestreams.Publisher"))), + CircuitBreakerOperatorInstrumentation.class.getName() + "$ApplyAdvice"); + } + + public static class ApplyAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void after( + @Advice.Return(readOnly = false) Publisher result, + @Advice.FieldValue(value = "circuitBreaker") CircuitBreaker circuitBreaker) { + + result = + ReactorHelper.wrapPublisher( + result, + CircuitBreakerDecorator.DECORATE, + circuitBreaker, + InstrumentationContext.get(Publisher.class, AgentSpan.class)::put); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java new file mode 100644 index 00000000000..fb2a55e5e76 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java @@ -0,0 +1,56 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.github.resilience4j.core.functions.CheckedSupplier; +import java.util.function.Function; +import net.bytebuddy.asm.Advice; +import org.reactivestreams.Publisher; + +@AutoService(InstrumenterModule.class) +public class FallbackOperatorInstrumentation extends Resilience4jReactorInstrumentation { + + public FallbackOperatorInstrumentation() { + super("resilience4j-fallback"); + } + + @Override + public String instrumentedType() { + return "io.github.resilience4j.reactor.ReactorOperatorFallbackDecorator"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("decorate")) + .and( + takesArgument(0, named("java.util.function.UnaryOperator")) + .and(returns(named("java.util.function.Function")))), + FallbackOperatorInstrumentation.class.getName() + "$DecorateAdvice"); + } + + public static class DecorateAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void after( + @Advice.Return(readOnly = false) Function, Publisher> result) { + + result = + ReactorHelper.wrapFunction( + result, InstrumentationContext.get(Publisher.class, AgentSpan.class)::putIfAbsent); + } + + // 2.0.0+ + public static void muzzleCheck(CheckedSupplier cs) throws Throwable { + cs.get(); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java new file mode 100644 index 00000000000..97dc83e0859 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java @@ -0,0 +1,100 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; + +public class ReactorHelper { + + private static final Logger log = LoggerFactory.getLogger(ReactorHelper.class); + + public static Function, Publisher> wrapFunction( + Function, Publisher> operator, + BiConsumer, AgentSpan> attachContext) { + return (value) -> { + AgentSpan current = Resilience4jSpan.current(); + AgentSpan owned = current == null ? Resilience4jSpan.start() : null; + Resilience4jSpanDecorator spanDecorator = Resilience4jSpanDecorator.DECORATE; + if (owned != null) { + current = owned; + spanDecorator.afterStart(current); + } + spanDecorator.decorate(current, null); + try (AgentScope scope = activateSpan(current)) { + Publisher ret = operator.apply(value); + attachContext.accept(ret, current); + if (owned == null) { + return ret; + } + return scheduleOwnedSpanFinish(ret, spanDecorator, owned); + } + }; + } + + public static Publisher wrapPublisher( + Publisher publisher, + Resilience4jSpanDecorator spanDecorator, + T data, + BiConsumer, AgentSpan> attachContext) { + // Create span at construction (needs transformDeferred which is what Spring R4j use) + AgentSpan current = Resilience4jSpan.current(); + AgentSpan owned = current == null ? Resilience4jSpan.start() : null; + if (owned != null) { + current = owned; + spanDecorator.afterStart(current); + } + spanDecorator.decorate(current, data); + + // This schedules a span to be finished when the publisher finishes to be non-zero + Publisher newResult = scheduleOwnedSpanFinish(publisher, spanDecorator, owned); + if (newResult instanceof Scannable) { + Scannable parent = (Scannable) newResult; + while (parent != null) { + if (parent instanceof Publisher) { + // Attach the span to the publisher to be activated by the reactive streams + // instrumentation to scope child spans + attachContext.accept((Publisher) parent, current); + } + parent = parent.scan(Scannable.Attr.PARENT); + } + } + return newResult; + } + + private static Publisher scheduleOwnedSpanFinish( + Publisher publisher, Resilience4jSpanDecorator spanDecorator, AgentSpan owned) { + if (owned == null) { + return publisher; + } + if (publisher instanceof Flux) { + return ((Flux) publisher).doFinally(beforeFinish(spanDecorator, owned)); + } else if (publisher instanceof Mono) { + return ((Mono) publisher).doFinally(beforeFinish(spanDecorator, owned)); + } else { + log.debug("Unexpected type of publisher {}", publisher); + // can't schedule span finish - finish immediately + spanDecorator.beforeFinish(owned); + owned.finish(); + } + return publisher; + } + + private static Consumer beforeFinish( + Resilience4jSpanDecorator spanDecorator, AgentSpan span) { + return signalType -> { + spanDecorator.beforeFinish(span); + span.finish(); + }; + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorInstrumentation.java new file mode 100644 index 00000000000..00b70c87e20 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorInstrumentation.java @@ -0,0 +1,36 @@ +package datadog.trace.instrumentation.resilience4j; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.HashMap; +import java.util.Map; + +/** + * The resilience4j-reactor instrumentations rely on the reactive-streams instrumentation. It + * attaches a r4j span to the publisher, which is then activated downstream. + */ +public abstract class Resilience4jReactorInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + public Resilience4jReactorInstrumentation(String... additionalNames) { + super("resilience4j-reactor", additionalNames); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".Resilience4jSpan", + packageName + ".Resilience4jSpanDecorator", + packageName + ".CircuitBreakerDecorator", + packageName + ".RetryDecorator", + packageName + ".ReactorHelper", + }; + } + + @Override + public Map contextStore() { + final Map ret = new HashMap<>(); + ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName()); + return ret; + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java new file mode 100644 index 00000000000..6e8dab9b528 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java @@ -0,0 +1,50 @@ +package datadog.trace.instrumentation.resilience4j; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.github.resilience4j.retry.Retry; +import net.bytebuddy.asm.Advice; +import org.reactivestreams.Publisher; + +@AutoService(InstrumenterModule.class) +public class RetryOperatorInstrumentation extends Resilience4jReactorInstrumentation { + + public RetryOperatorInstrumentation() { + super("resilience4j-retry"); + } + + @Override + public String instrumentedType() { + return "io.github.resilience4j.reactor.retry.RetryOperator"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("apply")) + .and(takesArgument(0, named("org.reactivestreams.Publisher"))), + RetryOperatorInstrumentation.class.getName() + "$ApplyAdvice"); + } + + public static class ApplyAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void after( + @Advice.Return(readOnly = false) Publisher result, + @Advice.FieldValue(value = "retry") Retry retry) { + + result = + ReactorHelper.wrapPublisher( + result, + RetryDecorator.DECORATE, + retry, + InstrumentationContext.get(Publisher.class, AgentSpan.class)::put); + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/CircuitBreakerTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/CircuitBreakerTest.groovy new file mode 100644 index 00000000000..08f23c6cc88 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/CircuitBreakerTest.groovy @@ -0,0 +1,88 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import io.github.resilience4j.circuitbreaker.CircuitBreaker +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator +import reactor.core.publisher.ConnectableFlux +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers + +import static datadog.trace.agent.test.utils.TraceUtils.runnableUnderTrace + +class CircuitBreakerTest extends InstrumentationSpecification { + + def "test circuit-breaker with Flux"() { + ConnectableFlux connection = Flux.just("foo", "bar") + .transformDeferred(CircuitBreakerOperator.of(CircuitBreaker.ofDefaults("C1"))) + .publishOn(Schedulers.boundedElastic()) + .publish() + + when: + connection.subscribe { + AgentTracer.startSpan("test", it).finish() + } + + runnableUnderTrace("parent", { + connection.connect() + }) + + then: + assertTraces(1) { + trace(4) { + sortSpansByStart() + span(0) { + operationName "parent" + errored false + } + span(1) { + operationName "resilience4j" + childOf(span(0)) + errored false + } + span(2) { + operationName "foo" + childOf(span(1)) + errored false + } + span(3) { + operationName "bar" + childOf(span(1)) + errored false + } + } + } + } + + def "test circuit-breaker with Mono"() { + Mono mono = Mono.just("abc") + .transformDeferred(CircuitBreakerOperator.of(CircuitBreaker.ofDefaults("C2"))) + + when: + runnableUnderTrace("parent", { + mono.subscribe { + AgentTracer.startSpan("test", it).finish() + } + }) + + then: + assertTraces(1) { + trace(3) { + sortSpansByStart() + span(0) { + operationName "parent" + errored false + } + span(1) { + operationName "resilience4j" + childOf(span(0)) + errored false + } + span(2) { + operationName "abc" + childOf(span(1)) + errored false + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/FallbackTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/FallbackTest.groovy new file mode 100644 index 00000000000..bc007f44018 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/FallbackTest.groovy @@ -0,0 +1,155 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import io.github.resilience4j.reactor.ReactorOperatorFallbackDecorator +import io.github.resilience4j.reactor.retry.RetryOperator +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static datadog.trace.agent.test.utils.TraceUtils.runnableUnderTrace + +class FallbackTest extends InstrumentationSpecification { + + def "Flux Retry Fallback"() { + setup: + RetryConfig config = RetryConfig.custom() + .retryOnResult(1::equals) // retry when element is greater than 1 + .maxAttempts(2) + .failAfterMaxAttempts(true) + .build() + Retry retry = Retry.of("R0", config) + def fallback = Flux.just(-1, -2).map({ v -> runUnderTrace("in"+v) { v } }) + + def retryOperator = ReactorOperatorFallbackDecorator.decorateRetry(RetryOperator.of(retry), fallback) + Flux flux = Flux + .just(1, 2).map({ v -> runUnderTrace("in"+v) { v } }) + .transformDeferred(retryOperator) + + when: + runnableUnderTrace("parent") { + flux.subscribe(v -> runnableUnderTrace("out" + v) {}) + } + + then: + assertTraces(1) { + trace(11) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "in1" + childOf span(1) + errored false + } + span(3) { + operationName "in1" // retry second attempt + childOf span(1) + errored false + } + span(4) { + operationName "out1" // only one out1 for two in1 attempts + childOf span(1) + errored false + } + span(5) { + operationName "in2" + childOf span(1) + errored false + } + span(6) { + operationName "out2" + childOf span(1) + errored false + } + // fallback elements go after all Flux elements + span(7) { + operationName "in-1" + childOf span(1) + errored false + } + span(8) { + operationName "out-1" + childOf span(1) + errored false + } + span(9) { + operationName "in-2" + childOf span(1) + errored false + } + span(10) { + operationName "out-2" + childOf span(1) + errored false + } + } + } + } + + def "Mono Retry Fallback"() { + setup: + RetryConfig config = RetryConfig.custom() + .retryOnResult(1::equals) // retry when element is "retry" + .maxAttempts(2) + .failAfterMaxAttempts(true) + .build() + Retry retry = Retry.of("R0", config) + def fallback = Mono.just(-1).map({ v -> runUnderTrace("in"+v) { v } }) + + def retryOperator = ReactorOperatorFallbackDecorator.decorateRetry(RetryOperator.of(retry), fallback) + Mono source = Mono + .just(1).map({ v -> runUnderTrace("in"+v) { v } }) + .transformDeferred(retryOperator) + + when: + runnableUnderTrace("parent") { + source.subscribe(v -> runnableUnderTrace("out" + v) {}) + } + + then: + assertTraces(1) { + trace(6) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { // first attempt + operationName "in1" + childOf span(1) + errored false + } + span(3) { // second attempt + operationName "in1" + childOf span(1) + errored false + } + span(4) {// fallback + operationName "in-1" + childOf span(1) + errored false + } + span(5) { + operationName "out-1" + childOf span(1) + errored false + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/RetryTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/RetryTest.groovy new file mode 100644 index 00000000000..4a865eef16a --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/RetryTest.groovy @@ -0,0 +1,75 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import io.github.resilience4j.reactor.retry.RetryOperator +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import reactor.core.publisher.ConnectableFlux +import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers +import static datadog.trace.agent.test.utils.TraceUtils.runnableUnderTrace + +class RetryTest extends InstrumentationSpecification { + + def "decorateCompletionStage retry twice on error"() { + setup: + ConnectableFlux connection = Flux.just("abc") + .map({ serviceCallErr(it, new IllegalStateException("error"))}) + .transformDeferred(RetryOperator.of(Retry.of("R0", RetryConfig.custom().maxAttempts(2).build()))) + .publishOn(Schedulers.boundedElastic()) + .publish() + + when: + connection.subscribe { + // won't show up because of errors upstream + runnableUnderTrace("child-" + it) {} + } + + runnableUnderTrace("parent") { + connection.connect() + } + + then: + assertTraces(1) { + trace(4) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCallErr/abc" + childOf span(1) + errored false + } + // second attempt span under the retry span + span(3) { + operationName "serviceCallErr/abc" + childOf span(1) + errored false + } + } + } + } + + def T serviceCall(T value) { + AgentTracer.startSpan("test", "serviceCall/$value").finish() + value + } + + void serviceCallErr(String value, IllegalStateException e) { + def span = AgentTracer.startSpan("test", "serviceCallErr/$value") + def scope = AgentTracer.activateSpan(span) + try { + throw e + } finally { + scope.close() + span.finish() + } + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/SpanDecoratorsForkedTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/SpanDecoratorsForkedTest.groovy new file mode 100644 index 00000000000..735b7b5fefe --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/SpanDecoratorsForkedTest.groovy @@ -0,0 +1,155 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.api.config.TraceInstrumentationConfig +import datadog.trace.bootstrap.instrumentation.api.Tags +import io.github.resilience4j.circuitbreaker.CircuitBreaker +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator +import io.github.resilience4j.reactor.retry.RetryOperator +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import reactor.core.publisher.Flux + +import static datadog.trace.agent.test.utils.TraceUtils.runnableUnderTrace + +class SpanDecoratorsForkedTest extends InstrumentationSpecification { + + def "decorate span with circuit-breaker"() { + setup: + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_MEASURED_ENABLED, measuredEnabled.toString()) + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_TAG_METRICS_ENABLED, tagMetricsEnabled.toString()) + + def ms = Mock(CircuitBreaker.Metrics) + + def cb = Mock(CircuitBreaker) + cb.getName() >> "cb1" + cb.getState() >> CircuitBreaker.State.CLOSED + cb.tryAcquirePermission() >> true + cb.getMetrics() >> ms + ms.getFailureRate() >> 0.1f + ms.getSlowCallRate() >> 0.2f + ms.getNumberOfBufferedCalls() >> 12 + ms.getNumberOfFailedCalls() >> 13 + ms.getNumberOfNotPermittedCalls() >> 2 + ms.getNumberOfSlowCalls() >> 23 + ms.getNumberOfSlowFailedCalls() >> 3 + ms.getNumberOfSlowSuccessfulCalls() >> 33 + ms.getNumberOfSuccessfulCalls() >> 50 + + Flux flux = Flux.just("foo", "bar") + .transformDeferred(CircuitBreakerOperator.of(cb)) + + when: + runnableUnderTrace("parent", { + flux.subscribe() + }) + + then: + assertTraces(1) { + trace(2) { + sortSpansByStart() + span(0) { + operationName "parent" + errored false + } + span(1) { + operationName "resilience4j" + childOf(span(0)) + errored false + tags { + "$Tags.COMPONENT" "resilience4j" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_INTERNAL + "resilience4j.circuit_breaker.name" "cb1" + "resilience4j.circuit_breaker.state" "CLOSED" + "resilience4j.circuit-breaker.metrics.failure_rate" 0.1f + "resilience4j.circuit-breaker.metrics.slow_call_rate" 0.2f + "resilience4j.circuit-breaker.metrics.buffered_calls" 12 + "resilience4j.circuit-breaker.metrics.failed_calls" 13 + "resilience4j.circuit-breaker.metrics.not_permitted_calls" 2 + "resilience4j.circuit-breaker.metrics.slow_calls" 23 + "resilience4j.circuit-breaker.metrics.slow_failed_calls" 3 + "resilience4j.circuit-breaker.metrics.slow_successful_calls" 33 + "resilience4j.circuit-breaker.metrics.successful_calls" 50 + defaultTags() + } + } + } + } + + where: + measuredEnabled | tagMetricsEnabled + true | true + false | false + true | false + false | true + } + + def "decorate span with retry"() { + setup: + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_MEASURED_ENABLED, measuredEnabled.toString()) + injectSysConfig(TraceInstrumentationConfig.RESILIENCE4J_TAG_METRICS_ENABLED, tagMetricsEnabled.toString()) + + def ms = Mock(Retry.Metrics) + def rc = Mock(RetryConfig) + def rt = Mock(Retry) + def cx = Mock(Retry.Context) + rt.getName() >> "rt1" + rt.getRetryConfig() >> rc + rt.getMetrics() >> ms + rt.context() >> cx + rc.getMaxAttempts() >> 23 + rc.isFailAfterMaxAttempts() >> true + ms.getNumberOfFailedCallsWithoutRetryAttempt() >> 1 + ms.getNumberOfFailedCallsWithRetryAttempt() >> 2 + ms.getNumberOfSuccessfulCallsWithoutRetryAttempt() >> 3 + ms.getNumberOfSuccessfulCallsWithRetryAttempt() >> 4 + + when: + Flux connection = Flux.just("abc") + .map({ serviceCall(it)}) + .transformDeferred(RetryOperator.of(rt)) + .publish() + + runnableUnderTrace("parent") { + connection.connect() + } + + then: + assertTraces(1) { + trace(3) { + sortSpansByStart() + span(0) { + operationName "parent" + errored false + } + span(1) { + operationName "resilience4j" + childOf(span(0)) + errored false + tags { + "$Tags.COMPONENT" "resilience4j" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_INTERNAL + "resilience4j.retry.name" "rt1" + "resilience4j.retry.max_attempts" 23 + "resilience4j.retry.fail_after_max_attempts" true + "resilience4j.retry.metrics.failed_without_retry" 1 + "resilience4j.retry.metrics.failed_with_retry" 2 + "resilience4j.retry.metrics.success_without_retry" 3 + "resilience4j.retry.metrics.success_with_retry" 4 + defaultTags() + } + } + span(2) { + operationName "serviceCall/abc" + childOf span(1) + errored false + } + } + } + + where: + measuredEnabled | tagMetricsEnabled + true | true + false | false + true | false + false | true + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/StackedOperatorsTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/StackedOperatorsTest.groovy new file mode 100644 index 00000000000..22e4a241467 --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/StackedOperatorsTest.groovy @@ -0,0 +1,77 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import io.github.resilience4j.circuitbreaker.CircuitBreaker +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator +import io.github.resilience4j.reactor.retry.RetryOperator +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import reactor.core.publisher.ConnectableFlux +import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers + +import static datadog.trace.agent.test.utils.TraceUtils.runnableUnderTrace + +class StackedOperatorsTest extends InstrumentationSpecification { + + def "test stacked operators retry(circuitbreaker)"() { + setup: + ConnectableFlux connection = Flux + .just("abc", "def") + .map({ serviceCall(it)}) + .transformDeferred(CircuitBreakerOperator.of(CircuitBreaker.ofDefaults("C2"))) + .transformDeferred(RetryOperator.of(Retry.of("R1", RetryConfig.custom().maxAttempts(3).build()))) + .publishOn(Schedulers.boundedElastic()) + .publish() + + when: + connection.subscribe { + runnableUnderTrace("child-" + it) {} + } + + runnableUnderTrace("parent", { + connection.connect() + }) + + then: + assertTraces(1) { + trace(6) { + sortSpansByStart() + span(0) { + operationName "parent" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "serviceCall/abc" + childOf span(1) + errored false + } + span(3) { + operationName "serviceCall/def" + childOf span(1) + errored false + } + span(4) { + operationName "child-abc" + childOf span(1) + errored false + } + span(5) { + operationName "child-def" + childOf span(1) + errored false + } + } + } + } + + def T serviceCall(T value) { + AgentTracer.startSpan("test", "serviceCall/$value").finish() + value + } +} diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/TraceCapturingTest.groovy b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/TraceCapturingTest.groovy new file mode 100644 index 00000000000..299a9a574be --- /dev/null +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2/src/test/groovy/TraceCapturingTest.groovy @@ -0,0 +1,284 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import io.github.resilience4j.circuitbreaker.CircuitBreaker +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator +import reactor.core.publisher.ConnectableFlux +import reactor.core.publisher.Flux + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static datadog.trace.agent.test.utils.TraceUtils.runnableUnderTrace + +class TraceCapturingTest extends InstrumentationSpecification { + + def "cold publisher"() { + def cb1 = CircuitBreaker.ofDefaults("cb1") + def flux = runUnderTrace("init") { + Flux.range(1, 2) + .map { + v -> runUnderTrace("in" + v) { + return v + } + } + .transformDeferred(CircuitBreakerOperator.of(cb1)) + } + + when: + (1..2).each { + runUnderTrace("sub" + it) { + flux.subscribe(v -> runnableUnderTrace("out" + v) {}) + } + } + + then: + assertTraces(3) { + trace(1) { + span(0) { + operationName "init" + errored false + } + } + trace(6) { + sortSpansByStart() + span(0) { + operationName "sub1" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "in1" + childOf span(1) + errored false + } + span(3) { + operationName "out1" + childOf span(1) + errored false + } + span(4) { + operationName "in2" + childOf span(1) + errored false + } + span(5) { + operationName "out2" + childOf span(1) + errored false + } + } + trace(6) { + sortSpansByStart() + span(0) { + operationName "sub2" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "in1" + childOf span(1) + errored false + } + span(3) { + operationName "out1" + childOf span(1) + errored false + } + span(4) { + operationName "in2" + childOf span(1) + errored false + } + span(5) { + operationName "out2" + childOf span(1) + errored false + } + } + } + } + + def "hot publisher with connect/replay"() { + def cb1 = CircuitBreaker.ofDefaults("cb1") + ConnectableFlux conn = runUnderTrace("init") { + Flux.range(1, 2) + .map { + v -> runUnderTrace("in" + v) { + return v + } + } + .transformDeferred(CircuitBreakerOperator.of(cb1)) + .publish() // or replay() + } + + when: + (1..2).each { + runUnderTrace("sub" + it) { + conn.subscribe(v -> runnableUnderTrace("out" + v) {}) + } + } + runnableUnderTrace("conn", { + conn.connect() + }) + + then: + assertTraces(4) { + trace(1) { + span(0) { + operationName "init" + errored false + } + } + trace(1) { + span(0) { + operationName "sub1" + errored false + } + } + trace(1) { + span(0) { + operationName "sub2" + errored false + } + } + trace(8) { + sortSpansByStart() + span(0) { + operationName "conn" + parent() + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "in1" + childOf span(1) + errored false + } + span(3) { + operationName "out1" + childOf span(1) + errored false + } + span(4) { + operationName "out1" + childOf span(1) + errored false + } + span(5) { + operationName "in2" + childOf span(1) + errored false + } + span(6) { + operationName "out2" + childOf span(1) + errored false + } + span(7) { + operationName "out2" + childOf span(1) + errored false + } + } + } + } + + def "hot publisher with autoConnect/refCount -- r4j spans connect to the subscriber that triggered auto-connect"() { + def cb1 = CircuitBreaker.ofDefaults("cb1") + Flux flux = runUnderTrace("init") { + Flux.range(1, 2) + .map { + v -> runUnderTrace("in" + v) { + return v + } + } + .transformDeferred(CircuitBreakerOperator.of(cb1)) + .publish() + .autoConnect(2) // refCount + } + + when: + (1..3).each { + runUnderTrace("sub" + it) { + flux.subscribe(v -> runnableUnderTrace("out" + v) { + }) + } + } + + then: + assertTraces(4) { + sortSpansByStart() + trace(1) { + span(0) { + operationName "init" + errored false + } + } + trace(1) { + span(0) { + operationName "sub1" + errored false + } + } + trace(8) { + span(0) { + operationName "sub2" + errored false + } + span(1) { + operationName "resilience4j" + childOf span(0) + errored false + } + span(2) { + operationName "in1" + childOf span(1) + errored false + } + span(3) { + operationName "out1" + childOf span(1) + errored false + } + span(4) { + operationName "out1" + childOf span(1) + errored false + } + span(5) { + operationName "in2" + childOf span(1) + errored false + } + span(6) { + operationName "out2" + childOf span(1) + errored false + } + span(7) { + operationName "out2" + childOf span(1) + errored false + } + } + trace(1) { + span(0) { + operationName "sub3" + errored false + } + } + } + } +} + diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index c2ab4e85393..195f7a43383 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -128,6 +128,9 @@ public final class TraceInstrumentationConfig { public static final String HYSTRIX_TAGS_ENABLED = "hystrix.tags.enabled"; public static final String HYSTRIX_MEASURED_ENABLED = "hystrix.measured.enabled"; + public static final String RESILIENCE4J_MEASURED_ENABLED = "resilience4j.measured.enabled"; + public static final String RESILIENCE4J_TAG_METRICS_ENABLED = "resilience4j.tag-metrics.enabled"; + public static final String IGNITE_CACHE_INCLUDE_KEYS = "ignite.cache.include_keys"; public static final String OBFUSCATION_QUERY_STRING_REGEXP = diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index aef84a8c58e..792ccdbaeea 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -535,6 +535,8 @@ import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_INCLUDE_ROUTINGKEY_IN_RESOURCE; import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_EXCHANGES; import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_QUEUES; +import static datadog.trace.api.config.TraceInstrumentationConfig.RESILIENCE4J_MEASURED_ENABLED; +import static datadog.trace.api.config.TraceInstrumentationConfig.RESILIENCE4J_TAG_METRICS_ENABLED; import static datadog.trace.api.config.TraceInstrumentationConfig.SERVLET_ASYNC_TIMEOUT_ERROR; import static datadog.trace.api.config.TraceInstrumentationConfig.SERVLET_PRINCIPAL_ENABLED; import static datadog.trace.api.config.TraceInstrumentationConfig.SERVLET_ROOT_CONTEXT_SERVICE_NAME; @@ -1122,6 +1124,9 @@ public static String getHostName() { private final boolean hystrixTagsEnabled; private final boolean hystrixMeasuredEnabled; + private final boolean resilience4jMeasuredEnabled; + private final boolean resilience4jTagMetricsEnabled; + private final boolean igniteCacheIncludeKeys; private final String obfuscationQueryRegexp; @@ -2546,6 +2551,10 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) hystrixTagsEnabled = configProvider.getBoolean(HYSTRIX_TAGS_ENABLED, false); hystrixMeasuredEnabled = configProvider.getBoolean(HYSTRIX_MEASURED_ENABLED, false); + resilience4jMeasuredEnabled = configProvider.getBoolean(RESILIENCE4J_MEASURED_ENABLED, false); + resilience4jTagMetricsEnabled = + configProvider.getBoolean(RESILIENCE4J_TAG_METRICS_ENABLED, false); + igniteCacheIncludeKeys = configProvider.getBoolean(IGNITE_CACHE_INCLUDE_KEYS, false); obfuscationQueryRegexp = @@ -4280,6 +4289,14 @@ public boolean isHystrixMeasuredEnabled() { return hystrixMeasuredEnabled; } + public boolean isResilience4jMeasuredEnabled() { + return resilience4jMeasuredEnabled; + } + + public boolean isResilience4jTagMetricsEnabled() { + return resilience4jTagMetricsEnabled; + } + public boolean isIgniteCacheIncludeKeys() { return igniteCacheIncludeKeys; } @@ -5692,6 +5709,10 @@ public String toString() { + hystrixTagsEnabled + ", hystrixMeasuredEnabled=" + hystrixMeasuredEnabled + + ", resilience4jMeasuredEnable=" + + resilience4jMeasuredEnabled + + ", resilience4jTagMetricsEnabled=" + + resilience4jTagMetricsEnabled + ", igniteCacheIncludeKeys=" + igniteCacheIncludeKeys + ", servletPrincipalEnabled=" diff --git a/settings.gradle.kts b/settings.gradle.kts index 0db78b03163..ab24c258988 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -504,6 +504,8 @@ include( ":dd-java-agent:instrumentation:reactor-netty-1", ":dd-java-agent:instrumentation:rediscala-1.8", ":dd-java-agent:instrumentation:renaissance", + ":dd-java-agent:instrumentation:resilience4j:resilience4j-2", + ":dd-java-agent:instrumentation:resilience4j:resilience4j-reactor-2", ":dd-java-agent:instrumentation:resteasy-appsec", ":dd-java-agent:instrumentation:restlet-2.2", ":dd-java-agent:instrumentation:rmi",