Skip to content

Commit 1233e24

Browse files
committed
Refactor BatchLogRecordProcessor
1 parent b299c7b commit 1233e24

File tree

2 files changed

+24
-31
lines changed

2 files changed

+24
-31
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,11 @@ def __init__(
205205
BatchLogRecordProcessor._validate_arguments(
206206
max_queue_size, schedule_delay_millis, max_export_batch_size
207207
)
208-
209208
self._exporter = exporter
210209
self._max_queue_size = max_queue_size
211210
self._schedule_delay = schedule_delay_millis / 1e3
212211
self._max_export_batch_size = max_export_batch_size
213212
# Not used. No way currently to pass timeout to export.
214-
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
215213
self._export_timeout_millis = export_timeout_millis
216214
# Deque is thread safe.
217215
self._queue = collections.deque([], max_queue_size)
@@ -220,10 +218,9 @@ def __init__(
220218
target=self.worker,
221219
daemon=True,
222220
)
223-
224221
self._shutdown = False
225222
self._export_lock = threading.Lock()
226-
self._worker_awaken = threading.Event()
223+
self._worker_sleep = threading.Event()
227224
self._worker_thread.start()
228225
if hasattr(os, "register_at_fork"):
229226
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
@@ -238,15 +235,15 @@ def _should_export_batch(
238235
# Always continue to export while queue length exceeds max batch size.
239236
if len(self._queue) >= self._max_export_batch_size:
240237
return True
241-
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
238+
if batch_strategy == BatchLogExportStrategy.EXPORT_ALL:
242239
return True
243-
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
240+
if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
244241
return num_iterations == 0
245242
return False
246243

247244
def _at_fork_reinit(self):
248245
self._export_lock = threading.Lock()
249-
self._worker_awaken = threading.Event()
246+
self._worker_sleep = threading.Event()
250247
self._queue.clear()
251248
self._worker_thread = threading.Thread(
252249
name="OtelBatchLogRecordProcessor",
@@ -261,15 +258,15 @@ def worker(self):
261258
# Lots of strategies in the spec for setting next timeout.
262259
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
263260
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
264-
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
261+
sleep_interrupted = self._worker_sleep.wait(self._schedule_delay)
265262
if self._shutdown:
266263
break
267264
self._export(
268265
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
269266
if sleep_interrupted
270267
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
271268
)
272-
self._worker_awaken.clear()
269+
self._worker_sleep.clear()
273270
self._export(BatchLogExportStrategy.EXPORT_ALL)
274271

275272
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
@@ -299,7 +296,7 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
299296

300297
def emit(self, log_data: LogData) -> None:
301298
if self._shutdown:
302-
_logger.info("Shutdown called, ignoring log.")
299+
_logger.warning("Shutdown called, ignoring log.")
303300
return
304301
if self._pid != os.getpid():
305302
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
@@ -308,15 +305,15 @@ def emit(self, log_data: LogData) -> None:
308305
_logger.warning("Queue full, dropping log.")
309306
self._queue.appendleft(log_data)
310307
if len(self._queue) >= self._max_export_batch_size:
311-
self._worker_awaken.set()
308+
self._worker_sleep.set()
312309

313310
def shutdown(self):
314311
if self._shutdown:
315312
return
316313
# Prevents emit and force_flush from further calling export.
317314
self._shutdown = True
318315
# Interrupts sleep in the worker, if it's sleeping.
319-
self._worker_awaken.set()
316+
self._worker_sleep.set()
320317
# Main worker loop should exit after one final export call with flush all strategy.
321318
self._worker_thread.join()
322319
self._exporter.shutdown()

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import unittest
2222
import weakref
2323
from concurrent.futures import ThreadPoolExecutor
24-
from unittest.mock import Mock, patch
24+
from unittest.mock import Mock, call, patch
2525

2626
from opentelemetry._logs import SeverityNumber
2727
from opentelemetry.sdk import trace
@@ -46,7 +46,6 @@
4646
)
4747
from opentelemetry.sdk.resources import Resource as SDKResource
4848
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
49-
from opentelemetry.test.concurrency_test import ConcurrencyTestBase
5049
from opentelemetry.trace import TraceFlags
5150
from opentelemetry.trace.span import INVALID_SPAN_CONTEXT
5251

@@ -487,20 +486,24 @@ def test_logs_exported_once_batch_size_reached(self):
487486
exporter.export.assert_called_once()
488487
after_export = time.time_ns()
489488
# Shows the worker's 30 second sleep was interrupted within a second.
490-
self.assertLess(after_export - before_export, 1e9)
489+
self.assertTrue((after_export - before_export) < 1e9)
491490

492491
# pylint: disable=no-self-use
493492
def test_logs_exported_once_schedule_delay_reached(self):
494493
exporter = Mock()
495494
log_record_processor = BatchLogRecordProcessor(
496495
exporter=exporter,
496+
# Should not reach this during the test, instead export should be called when delay millis is hit.
497497
max_queue_size=15,
498498
max_export_batch_size=15,
499499
schedule_delay_millis=100,
500500
)
501-
log_record_processor.emit(EMPTY_LOG)
502-
time.sleep(0.2)
503-
exporter.export.assert_called_once_with([EMPTY_LOG])
501+
for _ in range(15):
502+
log_record_processor.emit(EMPTY_LOG)
503+
time.sleep(0.11)
504+
exporter.export.assert_has_calls(
505+
[call([EMPTY_LOG]) for _ in range(15)]
506+
)
504507

505508
def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
506509
exporter = Mock()
@@ -517,13 +520,13 @@ def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
517520
exporter.export.assert_called_once_with([EMPTY_LOG])
518521
self.assertTrue(exporter._stopped)
519522

520-
with self.assertLogs(level="INFO") as log:
523+
with self.assertLogs(level="WARNING") as log:
521524
# This log should not be flushed.
522525
log_record_processor.emit(EMPTY_LOG)
523526
self.assertEqual(len(log.output), 1)
524527
self.assertEqual(len(log.records), 1)
525528
self.assertIn("Shutdown called, ignoring log.", log.output[0])
526-
exporter.export.assert_called_once()
529+
exporter.export.assert_called_once_with([EMPTY_LOG])
527530

528531
# pylint: disable=no-self-use
529532
def test_force_flush_flushes_logs(self):
@@ -552,7 +555,6 @@ def bulk_log_and_flush(num_logs):
552555
with ThreadPoolExecutor(max_workers=69) as executor:
553556
for idx in range(69):
554557
executor.submit(bulk_log_and_flush, idx + 1)
555-
556558
executor.shutdown()
557559

558560
finished_logs = exporter.get_finished_logs()
@@ -562,20 +564,16 @@ def bulk_log_and_flush(num_logs):
562564
hasattr(os, "fork"),
563565
"needs *nix",
564566
)
565-
def test_batch_log_record_processor_fork_clears_logs_from_child(self):
567+
def test_batch_log_record_processor_fork(self):
566568
exporter = InMemoryLogExporter()
567569
log_record_processor = BatchLogRecordProcessor(
568570
exporter,
569571
max_export_batch_size=64,
570572
schedule_delay_millis=30000,
571573
)
572-
# These logs should be flushed only from the parent process.
573-
# _at_fork_reinit should be called in the child process, to
574-
# clear these logs in the child process.
574+
# These are not expected to be flushed. Calling fork clears any logs not flushed.
575575
for _ in range(10):
576576
log_record_processor.emit(EMPTY_LOG)
577-
578-
# The below test also needs this, but it can only be set once.
579577
multiprocessing.set_start_method("fork")
580578

581579
def child(conn):
@@ -605,10 +603,8 @@ def test_batch_log_record_processor_fork_doesnot_deadlock(self):
605603
)
606604

607605
def child(conn):
608-
def _target():
606+
for _ in range(100):
609607
log_record_processor.emit(EMPTY_LOG)
610-
611-
ConcurrencyTestBase.run_with_many_threads(_target, 100)
612608
log_record_processor.force_flush()
613609
logs = exporter.get_finished_logs()
614610
conn.send(len(logs) == 100)
@@ -619,6 +615,7 @@ def _target():
619615
process.start()
620616
self.assertTrue(parent_conn.recv())
621617
process.join()
618+
self.assertTrue(len(exporter.get_finished_logs()) == 0)
622619

623620
def test_batch_log_record_processor_gc(self):
624621
# Given a BatchLogRecordProcessor
@@ -680,5 +677,4 @@ def formatter(record): # pylint: disable=unused-argument
680677
mock_stdout = Mock()
681678
exporter = ConsoleLogExporter(out=mock_stdout, formatter=formatter)
682679
exporter.export([EMPTY_LOG])
683-
684680
mock_stdout.write.assert_called_once_with(mock_record_str)

0 commit comments

Comments
 (0)