19
19
import java .util .ArrayList ;
20
20
import java .util .Arrays ;
21
21
import java .util .List ;
22
+ import java .util .Map ;
22
23
import java .util .concurrent .CountDownLatch ;
23
24
import java .util .concurrent .TimeUnit ;
24
25
import java .util .concurrent .atomic .AtomicInteger ;
37
38
import io .micrometer .tracing .handler .PropagatingSenderTracingObservationHandler ;
38
39
import io .micrometer .tracing .propagation .Propagator ;
39
40
import io .micrometer .tracing .test .simple .SimpleTracer ;
41
+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
40
42
import org .apache .kafka .clients .consumer .ConsumerRecord ;
41
43
import org .jspecify .annotations .Nullable ;
42
44
import org .junit .jupiter .api .Test ;
64
66
* Tests for batch individual record observation functionality.
65
67
*
66
68
* @author Igor Quintanilha
69
+ * @author Artem Bilan
70
+ *
67
71
* @since 4.0
68
72
*/
69
73
@ SpringJUnitConfig
@@ -78,8 +82,8 @@ public class BatchIndividualRecordObservationTests {
78
82
79
83
@ Test
80
84
void batchIndividualRecordObservationCreatesObservationPerRecord (@ Autowired BatchListener listener ,
81
- @ Autowired KafkaTemplate <Integer , String > template , @ Autowired TestObservationHandler observationHandler ,
82
- @ Autowired SimpleTracer tracer )
85
+ @ Autowired KafkaTemplate <Integer , String > template , @ Autowired TestObservationHandler observationHandler ,
86
+ @ Autowired SimpleTracer tracer )
83
87
throws InterruptedException {
84
88
85
89
// Clear any existing observations and spans
@@ -147,8 +151,9 @@ void batchIndividualRecordObservationCreatesObservationPerRecord(@Autowired Batc
147
151
}
148
152
149
153
@ Test
150
- void batchIndividualRecordObservationDisabledCreatesNoIndividualObservations (@ Autowired BatchListenerWithoutIndividualObservation batchListener ,
151
- @ Autowired KafkaTemplate <Integer , String > template , @ Autowired TestObservationHandler observationHandler )
154
+ void batchIndividualRecordObservationDisabledCreatesNoIndividualObservations (
155
+ @ Autowired BatchListenerWithoutIndividualObservation batchListener ,
156
+ @ Autowired KafkaTemplate <Integer , String > template , @ Autowired TestObservationHandler observationHandler )
152
157
throws InterruptedException {
153
158
154
159
// Clear any existing observations
@@ -181,12 +186,16 @@ ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
181
186
182
187
@ Bean
183
188
ConsumerFactory <Integer , String > consumerFactory (EmbeddedKafkaBroker broker ) {
184
- return new DefaultKafkaConsumerFactory <>(
185
- KafkaTestUtils .consumerProps (broker , "batch-tests" , false ));
189
+ Map <String , Object > configs = KafkaTestUtils .consumerProps (broker , "batch-tests" , false );
190
+ configs .put (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , 54 );
191
+ configs .put (ConsumerConfig .FETCH_MAX_WAIT_MS_CONFIG , 1000 );
192
+ return new DefaultKafkaConsumerFactory <>(configs );
186
193
}
187
194
188
195
@ Bean
189
- KafkaTemplate <Integer , String > template (ProducerFactory <Integer , String > pf , ObservationRegistry observationRegistry ) {
196
+ KafkaTemplate <Integer , String > template (ProducerFactory <Integer , String > pf ,
197
+ ObservationRegistry observationRegistry ) {
198
+
190
199
KafkaTemplate <Integer , String > template = new KafkaTemplate <>(pf );
191
200
template .setObservationEnabled (true );
192
201
template .setObservationRegistry (observationRegistry );
@@ -196,6 +205,7 @@ KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf, Obs
196
205
@ Bean
197
206
ConcurrentKafkaListenerContainerFactory <Integer , String > kafkaListenerContainerFactory (
198
207
ConsumerFactory <Integer , String > cf ) {
208
+
199
209
ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
200
210
new ConcurrentKafkaListenerContainerFactory <>();
201
211
factory .setConsumerFactory (cf );
@@ -208,6 +218,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
208
218
@ Bean
209
219
ConcurrentKafkaListenerContainerFactory <Integer , String > observationListenerContainerFactory (
210
220
ConsumerFactory <Integer , String > cf , ObservationRegistry observationRegistry ) {
221
+
211
222
ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
212
223
new ConcurrentKafkaListenerContainerFactory <>();
213
224
factory .setConsumerFactory (cf );
@@ -221,6 +232,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> observationListenerCont
221
232
@ Bean
222
233
ConcurrentKafkaListenerContainerFactory <Integer , String > batchOnlyObservationListenerContainerFactory (
223
234
ConsumerFactory <Integer , String > cf , ObservationRegistry observationRegistry ) {
235
+
224
236
ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
225
237
new ConcurrentKafkaListenerContainerFactory <>();
226
238
factory .setConsumerFactory (cf );
@@ -283,7 +295,9 @@ public <C> Span.Builder extract(C carrier, Getter<C> getter) {
283
295
}
284
296
285
297
@ Bean
286
- ObservationRegistry observationRegistry (Tracer tracer , Propagator propagator , MeterRegistry meterRegistry , TestObservationHandler testObservationHandler ) {
298
+ ObservationRegistry observationRegistry (Tracer tracer , Propagator propagator , MeterRegistry meterRegistry ,
299
+ TestObservationHandler testObservationHandler ) {
300
+
287
301
ObservationRegistry observationRegistry = ObservationRegistry .create ();
288
302
observationRegistry .observationConfig ()
289
303
.observationHandler (
@@ -309,34 +323,43 @@ BatchListener batchListener() {
309
323
BatchListenerWithoutIndividualObservation batchListenerWithoutIndividualObservation () {
310
324
return new BatchListenerWithoutIndividualObservation ();
311
325
}
326
+
312
327
}
313
328
314
329
static class BatchListener {
330
+
315
331
final CountDownLatch latch = new CountDownLatch (1 );
316
332
317
333
final List <String > processedRecords = new ArrayList <>();
318
334
319
- @ KafkaListener (topics = BATCH_INDIVIDUAL_OBSERVATION_TOPIC , containerFactory = "observationListenerContainerFactory" )
335
+ @ KafkaListener (topics = BATCH_INDIVIDUAL_OBSERVATION_TOPIC ,
336
+ containerFactory = "observationListenerContainerFactory" )
320
337
public void listen (List <ConsumerRecord <Integer , String >> records ) {
338
+
321
339
for (ConsumerRecord <Integer , String > record : records ) {
322
340
processedRecords .add (record .value ());
323
341
}
324
342
latch .countDown ();
325
343
}
344
+
326
345
}
327
346
328
347
static class BatchListenerWithoutIndividualObservation {
348
+
329
349
final CountDownLatch latch = new CountDownLatch (1 );
330
350
331
351
final List <String > processedRecords = new ArrayList <>();
332
352
333
- @ KafkaListener (topics = BATCH_ONLY_OBSERVATION_TOPIC , containerFactory = "batchOnlyObservationListenerContainerFactory" )
353
+ @ KafkaListener (topics = BATCH_ONLY_OBSERVATION_TOPIC ,
354
+ containerFactory = "batchOnlyObservationListenerContainerFactory" )
355
+
334
356
public void listen (List <ConsumerRecord <Integer , String >> records ) {
335
357
for (ConsumerRecord <Integer , String > record : records ) {
336
358
processedRecords .add (record .value ());
337
359
}
338
360
latch .countDown ();
339
361
}
362
+
340
363
}
341
364
342
365
static class TestObservationHandler implements ObservationHandler <Observation .Context > {
@@ -373,6 +396,7 @@ public int getStartedObservations() {
373
396
public void clear () {
374
397
startedObservations .set (0 );
375
398
}
399
+
376
400
}
377
401
378
402
}
0 commit comments