Skip to content
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
478d1f3
Initial commit to add timeout as a parm to export, make retries encom…
DylanRussell Apr 30, 2025
ccdd224
Fix lint issues
DylanRussell Apr 30, 2025
5bc8894
Fix a bunch of failing style/lint/spellcheck checks
DylanRussell May 1, 2025
ba92c5a
Remove timeout param from the export calls.
DylanRussell May 2, 2025
29144a1
Fix flaky windows test ?
DylanRussell May 2, 2025
838d7d9
Merge branch 'main' into retry2
DylanRussell May 6, 2025
66a4ebe
Merge branch 'main' into retry2
DylanRussell May 8, 2025
95ccfea
Respond to review comments..
DylanRussell May 9, 2025
d5ca894
Merge branch 'main' of github.com:DylanRussell/opentelemetry-python i…
DylanRussell May 12, 2025
8770e15
Delete exponential backoff code that is now unused
DylanRussell May 13, 2025
4c74411
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 13, 2025
f373caa
Add changelog and remove some unused imports..
DylanRussell May 13, 2025
d1e04e1
fix typo and unit test flaking on windows
DylanRussell May 13, 2025
f42ecd3
Refactor tests, HTTP exporters a bit
DylanRussell May 22, 2025
096b9f8
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
8673b45
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
46e15f1
Remove unneeded test reqs
DylanRussell May 22, 2025
dcba91a
Remove gRPC retry config
DylanRussell Jun 5, 2025
d506d54
Merge remote-tracking branch 'origin' into retry2
DylanRussell Jun 5, 2025
71b77e1
Tweak backoff calculation
DylanRussell Jun 5, 2025
2ae79bb
Lint and precommit
DylanRussell Jun 5, 2025
553ea3e
Empty commit
DylanRussell Jun 5, 2025
28b9399
Another empty commit
DylanRussell Jun 5, 2025
b4df54a
Calculate backoff in 1 place instead of 2
DylanRussell Jun 5, 2025
9e1ba28
Update changelog
DylanRussell Jun 5, 2025
0b54090
Update changelog
DylanRussell Jun 5, 2025
bc3110a
Make new _common directory in the http exporter for shared code
DylanRussell Jun 5, 2025
4bbecf8
precommit
DylanRussell Jun 5, 2025
3076c0f
Make many changes
DylanRussell Jun 9, 2025
f2583e0
Reorder shutdown stuff
DylanRussell Jun 10, 2025
f503053
Merge remote-tracking branch 'origin/main' into shutdown_refactor
DylanRussell Jun 13, 2025
5fa8c23
Fix merging
DylanRussell Jun 13, 2025
62d3699
Don't join the thread in case we are stuck in an individual export call
DylanRussell Jun 16, 2025
eb59db3
Add tests, changelog entry
DylanRussell Jun 16, 2025
61c28da
Update time assertions to satisfy windows.. Fix lint issues
DylanRussell Jun 16, 2025
23cd24a
Skip test on windows
DylanRussell Jun 16, 2025
b81c619
Merge branch 'main' into shutdown_refactor
DylanRussell Jun 17, 2025
33a860f
Use threading Event instead of sleep loop.
DylanRussell Jun 27, 2025
deec916
Merge remote-tracking branch 'origin' into shutdown_refactor
DylanRussell Jul 14, 2025
fb7f320
Respond to review comments..
DylanRussell Jul 14, 2025
79946d8
Pass remaining timeout to shutdown
DylanRussell Jul 16, 2025
adbe4b0
Run precommit
DylanRussell Jul 16, 2025
f47c24d
Change variable names
DylanRussell Jul 17, 2025
48309b6
Switch timeout back to timeout_millis
DylanRussell Jul 17, 2025
5218399
Merge branch 'main' of github.com:DylanRussell/opentelemetry-python i…
DylanRussell Jul 17, 2025
023f259
Update CHANGELOG.md
DylanRussell Jul 18, 2025
768435b
Update CHANGELOG.md
DylanRussell Jul 18, 2025
950f57a
Rename variable
DylanRussell Jul 18, 2025
eb17ec2
Merge branch 'shutdown_refactor' of github.com:DylanRussell/opentelem…
DylanRussell Jul 18, 2025
235d22d
Fix variable name
DylanRussell Jul 18, 2025
dbfd27d
Merge branch 'main' into shutdown_refactor
DylanRussell Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased


- Update OTLP gRPC/HTTP exporters: calling shutdown will now interrupt exporters that are sleeping
before a retry attempt, and cause them to return failure immediately.
Update BatchSpan/LogRecodProcessors: shutdown will now complete after 30 seconds of trying to finish
exporting any buffered telemetry, instead of continuing to export until all telemetry was exported.
([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)).

## Version 1.35.0/0.56b0 (2025-07-11)

- Update OTLP proto to v1.7 [#4645](https://github.com/open-telemetry/opentelemetry-python/pull/4645).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep, time
from time import time
from typing import ( # noqa: F401
Any,
Callable,
Expand Down Expand Up @@ -289,7 +289,7 @@ def __init__(
)
self._client = self._stub(self._channel)

self._export_lock = threading.Lock()
self._shutdown_is_occuring = threading.Event()
self._shutdown = False

@abstractmethod
Expand All @@ -309,62 +309,63 @@ def _export(
# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
with self._export_lock:
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=deadline_sec - time(),
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=deadline_sec - time(),
)
return self._result.SUCCESS
except RpcError as error:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
backoff_seconds = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)
return self._result.SUCCESS
except RpcError as error:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
backoff_seconds = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)
if (
error.code() not in _RETRYABLE_ERROR_CODES
or retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
):
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
return self._result.FAILURE
logger.warning(
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
error.code(),
if (
error.code() not in _RETRYABLE_ERROR_CODES
or retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
backoff_seconds,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
sleep(backoff_seconds)
return self._result.FAILURE
logger.warning(
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
error.code(),
self._exporting,
self._endpoint,
backoff_seconds,
)
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
if shutdown:
logger.warning("Shutdown in progress, aborting retry.")
break
# Not possible to reach here but the linter is complaining.
return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_lock.acquire(timeout=timeout_millis / 1e3)
self._shutdown = True
self._shutdown_is_occuring.set()
self._channel.close()
self._export_lock.release()

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,48 +325,40 @@ def test_shutdown(self):
"Exporter already shutdown, ignoring batch",
)

def test_shutdown_wait_last_export(self):
add_TraceServiceServicer_to_server(
TraceServiceServicerWithExportParams(
StatusCode.OK, optional_export_sleep=1
),
self.server,
)

export_thread = ThreadWithReturnValue(
target=self.exporter.export, args=([self.span],)
)
export_thread.start()
# Wait a bit for exporter to get lock and make export call.
time.sleep(0.25)
# pylint: disable=protected-access
self.assertTrue(self.exporter._export_lock.locked())
self.exporter.shutdown(timeout_millis=3000)
# pylint: disable=protected-access
self.assertTrue(self.exporter._shutdown)
self.assertEqual(export_thread.join(), SpanExportResult.SUCCESS)

def test_shutdown_doesnot_wait_last_export(self):
@unittest.skipIf(
system() == "Windows",
"For gRPC + windows there's some added delay in the RPCs which breaks the assertion over amount of time passed.",
)
def test_shutdown_interrupts_export_retry_backoff(self):
add_TraceServiceServicer_to_server(
TraceServiceServicerWithExportParams(
StatusCode.OK, optional_export_sleep=3
StatusCode.UNAVAILABLE,
),
self.server,
)

export_thread = ThreadWithReturnValue(
target=self.exporter.export, args=([self.span],)
)
export_thread.start()
# Wait for exporter to get lock and make export call.
time.sleep(0.25)
# pylint: disable=protected-access
self.assertTrue(self.exporter._export_lock.locked())
# Set to 1 seconds, so the 3 second server-side delay will not be reached.
self.exporter.shutdown(timeout_millis=1000)
# pylint: disable=protected-access
self.assertTrue(self.exporter._shutdown)
self.assertEqual(export_thread.join(), None)
with self.assertLogs(level=WARNING) as warning:
begin_wait = time.time()
export_thread.start()
# Wait a bit for export to fail and the backoff sleep to start
time.sleep(0.05)
# The code should now be in a 1 second backoff.
# pylint: disable=protected-access
self.assertFalse(self.exporter._shutdown_is_occuring.is_set())
self.exporter.shutdown()
self.assertTrue(self.exporter._shutdown_is_occuring.is_set())
export_result = export_thread.join()
end_wait = time.time()
self.assertEqual(export_result, SpanExportResult.FAILURE)
# Shutdown should have interrupted the sleep.
self.assertTrue(end_wait - begin_wait < 0.2)
self.assertEqual(
warning.records[1].message,
"Shutdown in progress, aborting retry.",
)

def test_export_over_closed_grpc_channel(self):
# pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import gzip
import logging
import random
import threading
import zlib
from io import BytesIO
from os import environ
from time import sleep, time
from time import time
from typing import Dict, Optional, Sequence

import requests
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
compression: Optional[Compression] = None,
session: Optional[requests.Session] = None,
):
self._shutdown_is_occuring = threading.Event()
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
_append_logs_path(
Expand Down Expand Up @@ -173,6 +175,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
not _is_retryable(resp)
or retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
Expand All @@ -185,8 +188,10 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
resp.reason,
backoff_seconds,
)
sleep(backoff_seconds)
# Not possible to reach here but the linter is complaining.
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
if shutdown:
_logger.warning("Shutdown in progress, aborting retry.")
break
return LogExportResult.FAILURE

def force_flush(self, timeout_millis: float = 10_000) -> bool:
Expand All @@ -197,8 +202,9 @@ def shutdown(self):
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
return
self._session.close()
self._shutdown = True
self._shutdown_is_occuring.set()
self._session.close()


def _compression_from_env() -> Compression:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import gzip
import logging
import random
import threading
import zlib
from io import BytesIO
from os import environ
from time import sleep, time
from time import time
from typing import ( # noqa: F401
Any,
Callable,
Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(
| None = None,
preferred_aggregation: dict[type, Aggregation] | None = None,
):
self._shutdown_is_occuring = threading.Event()
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
_append_metrics_path(
Expand Down Expand Up @@ -223,6 +225,7 @@ def export(
not _is_retryable(resp)
or retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
_logger.error(
"Failed to export metrics batch code: %s, reason: %s",
Expand All @@ -235,16 +238,19 @@ def export(
resp.reason,
backoff_seconds,
)
sleep(backoff_seconds)
# Not possible to reach here but the linter is complaining.
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
if shutdown:
_logger.warning("Shutdown in progress, aborting retry.")
break
return MetricExportResult.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
return
self._session.close()
self._shutdown = True
self._shutdown_is_occuring.set()
self._session.close()

@property
def _exporting(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import gzip
import logging
import random
import threading
import zlib
from io import BytesIO
from os import environ
from time import sleep, time
from time import time
from typing import Dict, Optional, Sequence

import requests
Expand Down Expand Up @@ -76,6 +77,7 @@ def __init__(
compression: Optional[Compression] = None,
session: Optional[requests.Session] = None,
):
self._shutdown_is_occuring = threading.Event()
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
_append_trace_path(
Expand Down Expand Up @@ -171,6 +173,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
not _is_retryable(resp)
or retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
_logger.error(
"Failed to export span batch code: %s, reason: %s",
Expand All @@ -183,16 +186,19 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
resp.reason,
backoff_seconds,
)
sleep(backoff_seconds)
# Not possible to reach here but the linter is complaining.
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
if shutdown:
_logger.warning("Shutdown in progress, aborting retry.")
break
return SpanExportResult.FAILURE

def shutdown(self):
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
return
self._session.close()
self._shutdown = True
self._shutdown_is_occuring.set()
self._session.close()

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
Expand Down
Loading
Loading