Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
- Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code and make the control flow more
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
[#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535), and
[#4580](https://github.com/open-telemetry/opentelemetry-python/pull/4580)).


## Version 1.33.0/0.54b0 (2025-05-09)
Expand Down
244 changes: 16 additions & 228 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@
# limitations under the License.
from __future__ import annotations

import collections
import logging
import os
import sys
import threading
import typing
import weakref
from enum import Enum
from os import environ, linesep
from time import time_ns

from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
Expand All @@ -31,14 +26,14 @@
detach,
set_value,
)
from opentelemetry.sdk._shared_internal import BatchProcessor
from opentelemetry.sdk.environment_variables import (
OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
OTEL_BSP_MAX_QUEUE_SIZE,
OTEL_BSP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.util._once import Once

_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
Expand Down Expand Up @@ -125,19 +120,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


class _FlushRequest:
"""Represents a request for the BatchSpanProcessor to flush spans."""

__slots__ = ["event", "num_spans"]

def __init__(self):
self.event = threading.Event()
self.num_spans = 0


_BSP_RESET_ONCE = Once()


class BatchSpanProcessor(SpanProcessor):
"""Batch span processor implementation.

Expand All @@ -151,6 +133,8 @@ class BatchSpanProcessor(SpanProcessor):
- :envvar:`OTEL_BSP_MAX_QUEUE_SIZE`
- :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
- :envvar:`OTEL_BSP_EXPORT_TIMEOUT`

All the logic for emitting spans, shutting down etc. resides in the BatchProcessor class.
"""

def __init__(
Expand All @@ -174,6 +158,7 @@ def __init__(
BatchSpanProcessor._default_max_export_batch_size()
)

# Not used. No way currently to pass timeout to export.
if export_timeout_millis is None:
export_timeout_millis = (
BatchSpanProcessor._default_export_timeout_millis()
Expand All @@ -183,227 +168,30 @@ def __init__(
max_queue_size, schedule_delay_millis, max_export_batch_size
)

self.span_exporter = span_exporter
self.queue = collections.deque([], max_queue_size) # type: typing.Deque[Span]
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
self._batch_processor = BatchProcessor(
span_exporter,
schedule_delay_millis,
max_export_batch_size,
export_timeout_millis,
max_queue_size,
"Span",
)
self.condition = threading.Condition(threading.Lock())
self._flush_request = None # type: typing.Optional[_FlushRequest]
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
self.export_timeout_millis = export_timeout_millis
self.done = False
# flag that indicates that spans are being dropped
self._spans_dropped = False
# precallocated list to send spans to exporter
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
self._pid = os.getpid()

def on_start(
self, span: Span, parent_context: Context | None = None
) -> None:
pass

def on_end(self, span: ReadableSpan) -> None:
if self.done:
logger.warning("Already shutdown, dropping span.")
return
if not span.context.trace_flags.sampled:
return
if self._pid != os.getpid():
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)

if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
self._spans_dropped = True

self.queue.appendleft(span)

if len(self.queue) >= self.max_export_batch_size:
with self.condition:
self.condition.notify()

def _at_fork_reinit(self):
self.condition = threading.Condition(threading.Lock())
self.queue.clear()

# worker_thread is local to a process, only the thread that issued fork continues
# to exist. A new worker thread must be started in child process.
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
)
self.worker_thread.start()
self._pid = os.getpid()

def worker(self):
timeout = self.schedule_delay_millis / 1e3
flush_request = None # type: typing.Optional[_FlushRequest]
while not self.done:
with self.condition:
if self.done:
# done flag may have changed, avoid waiting
break
flush_request = self._get_and_unset_flush_request()
if (
len(self.queue) < self.max_export_batch_size
and flush_request is None
):
self.condition.wait(timeout)
flush_request = self._get_and_unset_flush_request()
if not self.queue:
# spurious notification, let's wait again, reset timeout
timeout = self.schedule_delay_millis / 1e3
self._notify_flush_request_finished(flush_request)
flush_request = None
continue
if self.done:
# missing spans will be sent when calling flush
break

# subtract the duration of this export call to the next timeout
start = time_ns()
self._export(flush_request)
end = time_ns()
duration = (end - start) / 1e9
timeout = self.schedule_delay_millis / 1e3 - duration

self._notify_flush_request_finished(flush_request)
flush_request = None

# there might have been a new flush request while export was running
# and before the done flag switched to true
with self.condition:
shutdown_flush_request = self._get_and_unset_flush_request()

# be sure that all spans are sent
self._drain_queue()
self._notify_flush_request_finished(flush_request)
self._notify_flush_request_finished(shutdown_flush_request)

def _get_and_unset_flush_request(
self,
) -> typing.Optional[_FlushRequest]:
"""Returns the current flush request and makes it invisible to the
worker thread for subsequent calls.
"""
flush_request = self._flush_request
self._flush_request = None
if flush_request is not None:
flush_request.num_spans = len(self.queue)
return flush_request

@staticmethod
def _notify_flush_request_finished(
flush_request: typing.Optional[_FlushRequest],
):
"""Notifies the flush initiator(s) waiting on the given request/event
that the flush operation was finished.
"""
if flush_request is not None:
flush_request.event.set()

def _get_or_create_flush_request(self) -> _FlushRequest:
"""Either returns the current active flush event or creates a new one.
self._batch_processor.emit(span)

The flush event will be visible and read by the worker thread before an
export operation starts. Callers of a flush operation may wait on the
returned event to be notified when the flush/export operation was
finished.
def shutdown(self):
return self._batch_processor.shutdown()

This method is not thread-safe, i.e. callers need to take care about
synchronization/locking.
"""
if self._flush_request is None:
self._flush_request = _FlushRequest()
return self._flush_request

def _export(self, flush_request: typing.Optional[_FlushRequest]):
"""Exports spans considering the given flush_request.

In case of a given flush_requests spans are exported in batches until
the number of exported spans reached or exceeded the number of spans in
the flush request.
In no flush_request was given at most max_export_batch_size spans are
exported.
"""
if not flush_request:
self._export_batch()
return

num_spans = flush_request.num_spans
while self.queue:
num_exported = self._export_batch()
num_spans -= num_exported

if num_spans <= 0:
break

def _export_batch(self) -> int:
"""Exports at most max_export_batch_size spans and returns the number of
exported spans.
"""
idx = 0
# currently only a single thread acts as consumer, so queue.pop() will
# not raise an exception
while idx < self.max_export_batch_size and self.queue:
self.spans_list[idx] = self.queue.pop()
idx += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
# Ignore type b/c the Optional[None]+slicing is too "clever"
# for mypy
self.span_exporter.export(self.spans_list[:idx]) # type: ignore
except Exception: # pylint: disable=broad-exception-caught
logger.exception("Exception while exporting Span batch.")
detach(token)

# clean up list
for index in range(idx):
self.spans_list[index] = None
return idx

def _drain_queue(self):
"""Export all elements until queue is empty.

Can only be called from the worker thread context because it invokes
`export` that is not thread safe.
"""
while self.queue:
self._export_batch()

def force_flush(self, timeout_millis: int | None = None) -> bool:
if timeout_millis is None:
timeout_millis = self.export_timeout_millis

if self.done:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True

with self.condition:
flush_request = self._get_or_create_flush_request()
# signal the worker thread to flush and wait for it to finish
self.condition.notify_all()

# wait for token to be processed
ret = flush_request.event.wait(timeout_millis / 1e3)
if not ret:
logger.warning("Timeout was exceeded in force_flush().")
return ret

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
self.span_exporter.shutdown()
def force_flush(self, timeout_millis: typing.Optional[int] = None):
return self._batch_processor.force_flush(timeout_millis)

@staticmethod
def _default_max_queue_size():
Expand Down
Loading
Loading