Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<spotbugs.fail>false</spotbugs.fail>
<opentelemetry.version>0.14.0</opentelemetry.version>
<opentelemetry-sdk-metrics.version>1.41.0</opentelemetry-sdk-metrics.version>
<zipkin.version>2.16.3</zipkin.version>
<opentelemetry.version>1.41.0</opentelemetry.version>
<zipkin.version>3.4.0</zipkin.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -81,19 +80,24 @@
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-zipkin</artifactId>
<artifactId>opentelemetry-sdk-metrics</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-metrics</artifactId>
<version>${opentelemetry-sdk-metrics.version}</version>
<artifactId>opentelemetry-exporter-zipkin</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-urlconnection</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -59,18 +60,23 @@ public Tracer initTracer(@Autowired OpenTelemetry openTelemetry) {
* Creates an opentelemetry instance.
* @return OpenTelemetry.
*/
public static OpenTelemetry createOpenTelemetry() {
public static OpenTelemetrySdk createOpenTelemetry() {
// Only exports to Zipkin if it is up. Otherwise, ignore it.
// This is helpful to avoid exceptions for examples that do not require Zipkin.
if (isZipkinUp()) {
Resource serviceResource = Resource.getDefault()
.toBuilder()
.put("service.name", InvokeClient.class.getName()) // Use ResourceAttributes constant
.build();
String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);

ZipkinSpanExporter zipkinExporter =
ZipkinSpanExporter.builder()
.setEndpoint(httpUrl + ENDPOINT_V2_SPANS)
.setServiceName(InvokeClient.class.getName())
.build();

SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.setResource(serviceResource)
.addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter))
.build();

Expand Down Expand Up @@ -100,7 +106,7 @@ public static reactor.util.context.ContextView getReactorContext() {
*/
public static reactor.util.context.Context getReactorContext(Context context) {
Map<String, String> map = new HashMap<>();
TextMapPropagator.Setter<Map<String, String>> setter =
TextMapSetter<Map<String, String>> setter =
(carrier, key, value) -> map.put(key, value);

GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.http.HttpServletRequest;
Expand All @@ -34,8 +35,8 @@ public class OpenTelemetryInterceptor implements HandlerInterceptor {
@Autowired
private OpenTelemetry openTelemetry;

private static final TextMapPropagator.Getter<HttpServletRequest> HTTP_SERVLET_REQUEST_GETTER =
new TextMapPropagator.Getter<>() {
private static final TextMapGetter<HttpServletRequest> HTTP_SERVLET_REQUEST_GETTER =
new TextMapGetter<>() {
@Override
public Iterable<String> keys(HttpServletRequest carrier) {
return Collections.list(carrier.getHeaderNames());
Expand Down
30 changes: 19 additions & 11 deletions examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
Expand Down Expand Up @@ -55,28 +55,41 @@ public class BulkPublisher {
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetrySdk.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(SpanKind.CLIENT).startSpan();

try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient) client;

c.waitForSidecar(10000);

try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");

List<String> messages = new ArrayList<>();

System.out.println("Constructing the list of messages to publish");

for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);

messages.add(message);

System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)

BulkPublishResponse<?> res = client
.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.contextWrite(getReactorContext()).block();

System.out.println("Published the set of messages in a single call to Dapr");

if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");

for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
Expand All @@ -86,16 +99,11 @@ public static void main(String[] args) throws Exception {
throw new Exception("null response from dapr");
}
}
// Close the span.

span.end();
// Allow plenty of time for Dapr to export all relevant spans to the tracing infra.
Thread.sleep(10000);
// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();

openTelemetrySdk.getSdkTracerProvider().shutdown();
System.out.println("Done");
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
Expand Down Expand Up @@ -51,39 +51,36 @@ public class PublisherWithTracing {
* @throws Exception A startup Exception.
*/
public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(PublisherWithTracing.class.getCanonicalName());
Span span = tracer.spanBuilder("Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetrySdk.getTracer(PublisherWithTracing.class.getCanonicalName());
Span span = tracer.spanBuilder("Publisher's Main").setSpanKind(SpanKind.CLIENT).startSpan();

try (DaprClient client = new DaprClientBuilder().build()) {
try (Scope scope = span.makeCurrent()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);

// Publishing messages, notice the use of subscriberContext() for tracing.
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
message).contextWrite(getReactorContext()).block();

System.out.println("Published message: " + message);

try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();

return;
}
}
}

// Close the span.
span.end();

// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();

// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
openTelemetrySdk.getSdkTracerProvider().shutdown();
System.out.println("Done.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.examples.OpenTelemetryConfig;
import io.dapr.utils.TypeRef;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
Expand Down Expand Up @@ -48,10 +48,10 @@ public class InvokeClient {
* @param args Messages to be sent as request for the invoke API.
*/
public static void main(String[] args) throws Exception {
final OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
final Tracer tracer = openTelemetry.getTracer(InvokeClient.class.getCanonicalName());
OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetrySdk.getTracer(InvokeClient.class.getCanonicalName());
Span span = tracer.spanBuilder("Example's Main").setSpanKind(SpanKind.CLIENT).startSpan();

Span span = tracer.spanBuilder("Example's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = (new DaprClientBuilder()).build()) {
for (String message : args) {
try (Scope scope = span.makeCurrent()) {
Expand All @@ -70,15 +70,11 @@ public static void main(String[] args) throws Exception {
}).contextWrite(getReactorContext()).block();
}
}
}
span.end();
shutdown();
System.out.println("Done");
}

private static void shutdown() throws Exception {
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
Validation.validate();
span.end();
openTelemetrySdk.getSdkTracerProvider().shutdown();
Validation.validate();
System.out.println("Done");
}
}

}
Loading