Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
from opentelemetry.instrumentation.milvus.utils import dont_throw
from opentelemetry.semconv.trace import SpanAttributes

from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry import context as context_api
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
)
from opentelemetry.semconv_ai import Events, EventAttributes
from opentelemetry.semconv_ai import SpanAttributes as AISpanAttributes

code_to_error_type = dict(
[
(1, "invalid_arguments"),
(2, "empty_collection"),
(3, "collection_already_exists"),
(4, "collection_not_loaded"),
(5, "partition_not_loaded"),
(6, "segment_not_loaded"),
(7, "database_not_exist"),
(100, "collection_not_exist"),
(111, "partition_not_exist"),
(112, "index_not_exist"),
(2000, "internal_error"),
(2100, "search_execution_error"),
(2200, "vector_data_invalid"),
(3000, "timeout_error"),
(3100, "index_build_timeout"),
(3200, "query_node_timeout"),
(4000, "connection_failed"),
(4100, "channel_not_ready"),
(5000, "authentication_failed"),
(5100, "permission_denied"),
(5200, "token_expired"),
]
)


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions."""
Expand Down Expand Up @@ -56,16 +82,20 @@ def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
elif to_wrap.get("method") == "hybrid_search":
_set_hybrid_search_attributes(span, kwargs)

return_value = wrapped(*args, **kwargs)

if to_wrap.get("method") == "query":
_add_query_result_events(span, return_value)

if (
to_wrap.get("method") == "search"
or to_wrap.get("method") == "hybrid_search"
):
_add_search_result_events(span, return_value)
try:
return_value = wrapped(*args, **kwargs)
if to_wrap.get("method") == "query":
_add_query_result_events(span, return_value)

if (
to_wrap.get("method") == "search"
or to_wrap.get("method") == "hybrid_search"
):
_add_search_result_events(span, return_value)
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also set the span status as error

Copy link
Contributor Author

@divyapathak24 divyapathak24 Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirga I have raised the exception at line 85 in wrapper.py, which automatically sets the span status to "error"—as shown in the attached screenshot as well.

Screenshot 2025-06-29 at 9 04 33 AM

I have also added a test to verify that the span status is correctly set to "error" line 88 in test_error.py

error_type = code_to_error_type.get(e.code, type(e).__name__)
span.set_attribute(ERROR_TYPE, error_type)
raise

return return_value

Expand Down Expand Up @@ -345,6 +375,7 @@ def set_global_stats():
_set_span_attribute(
span, AISpanAttributes.MILVUS_SEARCH_RESULT_COUNT, total_matches
)

for query_idx, query_results in enumerate(kwargs):

query_distances = []
Expand Down
88 changes: 88 additions & 0 deletions packages/opentelemetry-instrumentation-milvus/tests/test_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os
import random

import pymilvus
import pytest
from opentelemetry.semconv_ai import Events, SpanAttributes, EventAttributes

path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "milvus.db")
milvus = pymilvus.MilvusClient(uri=path)


@pytest.fixture
def collection():
collection_name = "Colors"
milvus.create_collection(collection_name=collection_name, dimension=5)
yield collection_name
milvus.drop_collection(collection_name=collection_name)


def insert_data(collection):
colors = [
"green",
"blue",
"yellow",
"red",
"black",
"white",
"purple",
"pink",
"orange",
"grey",
]
data = [
{
"id": i,
"vector": [random.uniform(-1, 1) for _ in range(5)],
"color": random.choice(colors),
"tag": random.randint(1000, 9999),
}
for i in range(1000)
]
data += [
{
"id": 1000,
"vector": [random.uniform(-1, 1) for _ in range(5)],
"color": "brown",
"tag": 1234,
},
{
"id": 1001,
"vector": [random.uniform(-1, 1) for _ in range(5)],
"color": "brown",
"tag": 5678,
},
{
"id": 1002,
"vector": [random.uniform(-1, 1) for _ in range(5)],
"color": "brown",
"tag": 9101,
},
]
for i in data:
i["color_tag"] = "{}_{}".format(i["color"], i["tag"])
milvus.insert(collection_name=collection, data=data)


def test_milvus_single_vector_search(exporter, collection):
insert_data(collection)

bad_query_vector = [random.uniform(-1, 1) for _ in range(2)]
search_params = {"radius": 0.5, "metric_type": "COSINE", "index_type": "IVF_FLAT"}
with pytest.raises(Exception) as exc_info:
milvus.search(
collection_name=collection,
data=[bad_query_vector],
anns_field="vector",
search_params=search_params,
output_fields=["color_tag"],
limit=3,
timeout=10,
)

# Get finished spans
spans = exporter.get_finished_spans()
span = next(span for span in spans if span.name == "milvus.search")

# Check the span attributes related to search
assert span.attributes.get("error.type") == "internal_error"
Loading