diff --git a/apps/opik-documentation/documentation/fern/docs/tracing/integrations/claude-code.mdx b/apps/opik-documentation/documentation/fern/docs/tracing/integrations/claude-code.mdx new file mode 100644 index 0000000000..49d260feee --- /dev/null +++ b/apps/opik-documentation/documentation/fern/docs/tracing/integrations/claude-code.mdx @@ -0,0 +1,274 @@ +--- +description: "Log traces for all Claude Code Python SDK LLM calls" +--- + +# Claude Code + +[Claude Code](https://docs.anthropic.com/claude/docs/claude-code) is Anthropic's coding assistant that provides programmatic access to Claude's code generation and analysis capabilities through a Python SDK. + +![Claude Code Integration](/img/tracing/claude_code_integration.png) + +## Getting Started + +### Account Setup + +To use Claude Code with Opik, you'll need: + +1. Create a free [Comet account](https://comet.com/signup?utm_source=opik&utm_medium=docs&utm_campaign=claude-code) +2. Get your [Claude Code API key](https://console.anthropic.com/) from Anthropic + +### Installation + +```bash +pip install opik claude-code-sdk +``` + +### Configuration + +Configure Opik to track your Claude Code calls: + +```python +import opik + +opik.configure() +``` + +### Environment Setup + +Configure your API keys: + +```bash +export ANTHROPIC_API_KEY="your-claude-api-key" +``` + +## Basic Usage + +Use `track_claude_code` to track your Claude Code query calls: + +```python +import asyncio +from opik.integrations.claude_code import track_claude_code +from claude_code_sdk import query, ClaudeCodeOptions + +# Track the Claude Code query function +tracked_query = track_claude_code(query, project_name="claude-code-project") + +async def main(): + # Use the tracked query function + async for message in tracked_query("What is 2 + 2?"): + print(message) + +asyncio.run(main()) +``` + +## Advanced Usage + +### Using with `@track` decorator + +Combine Claude Code tracking with Opik's `@track` decorator for more detailed tracing: + +```python +import asyncio +from opik import track +from opik.integrations.claude_code import track_claude_code +from claude_code_sdk import query, ClaudeCodeOptions + +# Track the Claude Code query function +tracked_query = track_claude_code(query) + +@track +async def generate_code(requirements: str): + """Generate code based on requirements.""" + options = ClaudeCodeOptions( + allowed_tools=["Read", "Write"], + system_prompt="You are an expert Python developer.", + max_turns=3 + ) + + responses = [] + async for message in tracked_query( + f"Create a Python function that {requirements}", + options=options + ): + responses.append(message) + + return responses + +# This will create a trace with nested spans +result = asyncio.run(generate_code("calculates fibonacci numbers")) +``` + +### Tool Usage Tracking + +Claude Code supports various tools. The integration automatically tracks tool usage: + +```python +import asyncio +from opik.integrations.claude_code import track_claude_code +from claude_code_sdk import query, ClaudeCodeOptions + +tracked_query = track_claude_code(query) + +async def main(): + options = ClaudeCodeOptions( + allowed_tools=["Read", "Write", "Bash"], + system_prompt="You are a helpful file assistant." + ) + + async for message in tracked_query( + "Create a file called hello.txt with 'Hello, World!' and then read it back", + options=options + ): + print(f"Message type: {type(message).__name__}") + print(f"Content: {message}") + +asyncio.run(main()) +``` + +### Custom Span Configuration + +Use the advanced tracking function for more control: + +```python +from opik.integrations.claude_code import track_claude_code_with_options +from claude_code_sdk import query + +tracked_query = track_claude_code_with_options( + query, + project_name="advanced-project", + span_name="code_generation", + tags=["code", "assistant"], + metadata={"version": "1.0", "model": "claude-3"} +) +``` + +### Cost Tracking + +The integration automatically captures cost information from Claude Code's `ResultMessage`: + +```python +import asyncio +from opik.integrations.claude_code import track_claude_code +from claude_code_sdk import query + +tracked_query = track_claude_code(query) + +async def main(): + total_cost = 0.0 + + async for message in tracked_query("Generate a complex algorithm"): + # Cost information is automatically tracked in Opik + if hasattr(message, 'total_cost_usd') and message.total_cost_usd: + total_cost += message.total_cost_usd + print(f"Current cost: ${total_cost:.4f}") + +asyncio.run(main()) +``` + +## What's Captured + +The Claude Code integration captures: + +- **Prompts**: Input prompts sent to Claude +- **Options**: System prompts, allowed tools, and configuration +- **Responses**: All assistant messages and content +- **Cost**: Token usage and cost information +- **Tool Usage**: Which tools were allowed and used +- **Conversation Flow**: Complete async message streams +- **Errors**: Any exceptions during code generation + +## Advanced Features + +### Async Generator Handling + +Claude Code uses async generators for streaming responses. The integration handles this automatically: + +```python +import asyncio +from opik.integrations.claude_code import track_claude_code +from claude_code_sdk import query + +tracked_query = track_claude_code(query) + +async def stream_code_generation(): + async for message in tracked_query("Create a web scraper in Python"): + # Each message is tracked as part of the overall span + yield message + +# The entire generator execution is captured as one span +async def main(): + async for chunk in stream_code_generation(): + print(chunk) + +asyncio.run(main()) +``` + +### System Prompt Tracking + +System prompts and configuration are automatically captured: + +```python +import asyncio +from opik.integrations.claude_code import track_claude_code +from claude_code_sdk import query, ClaudeCodeOptions + +tracked_query = track_claude_code(query) + +async def main(): + options = ClaudeCodeOptions( + system_prompt="You are a security-focused developer. Always include error handling.", + allowed_tools=["Read", "Write"], + max_turns=5 + ) + + async for message in tracked_query( + "Create a secure file upload function", + options=options + ): + print(message) + # System prompt and tools are captured in span metadata + +asyncio.run(main()) +``` + +## Troubleshooting + +### Common Issues + +1. **Import Error**: Ensure you have `claude-code-sdk` installed: + ```bash + pip install claude-code-sdk + ``` + +2. **API Key Issues**: Verify your Anthropic API key is set: + ```bash + echo $ANTHROPIC_API_KEY + ``` + +3. **Async Context Required**: Claude Code requires async/await: + ```python + # ❌ Wrong - missing async/await + for message in tracked_query("prompt"): + print(message) + + # ✅ Correct - using async/await + async for message in tracked_query("prompt"): + print(message) + ``` + +4. **Double Wrapping Warning**: If you see warnings about already tracked functions, ensure you're not wrapping the same function twice: + ```python + # ❌ Wrong - double wrapping + tracked_query = track_claude_code(query) + double_tracked = track_claude_code(tracked_query) # Warning! + + # ✅ Correct - single wrapping + tracked_query = track_claude_code(query) + ``` + +### Performance Considerations + +- The integration is designed to be non-blocking and doesn't interfere with Claude Code's async generator performance +- Cost tracking happens automatically without additional API calls +- All tracking is done in the background using Opik's efficient batching system + diff --git a/sdks/python/examples/claude_code_integration_example.py b/sdks/python/examples/claude_code_integration_example.py new file mode 100644 index 0000000000..cc673806cd --- /dev/null +++ b/sdks/python/examples/claude_code_integration_example.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +""" +Example demonstrating Opik integration with Claude Code Python SDK. + +This example shows how to: +1. Configure Opik to track Claude Code calls +2. Use the tracked query function +3. Handle async generators +4. Track cost information +5. Use with custom options + +Prerequisites: +- pip install opik claude-code-sdk +- export ANTHROPIC_API_KEY="your-api-key" +""" + +import asyncio +import opik +from opik.integrations.claude_code import ( + track_claude_code, + track_claude_code_with_options, +) + +# Note: This example uses mock objects since claude-code-sdk might not be available +# In a real scenario, you would import from claude_code_sdk: +# from claude_code_sdk import query, ClaudeCodeOptions + + +# Mock classes for demonstration (replace with real claude_code_sdk imports) +class MockTextBlock: + def __init__(self, text: str): + self.text = text + + +class MockAssistantMessage: + def __init__(self, content: list): + self.content = content + + +class MockResultMessage: + def __init__(self, total_cost_usd: float = 0.0): + self.total_cost_usd = total_cost_usd + + +class MockClaudeCodeOptions: + def __init__(self, system_prompt=None, allowed_tools=None, max_turns=None): + self.system_prompt = system_prompt + self.allowed_tools = allowed_tools + self.max_turns = max_turns + + +# Mock query function (replace with real claude_code_sdk.query) +async def mock_query(prompt: str, options=None): + """Mock query function for demonstration.""" + yield MockAssistantMessage([MockTextBlock(f"Processing: {prompt}")]) + yield MockAssistantMessage( + [MockTextBlock("Here's your generated code: def example(): pass")] + ) + yield MockResultMessage(total_cost_usd=0.0023) + + +async def basic_example(): + """Basic Claude Code tracking example.""" + print("=== Basic Claude Code Tracking Example ===") + + # Configure Opik + opik.configure() + + # Track the Claude Code query function + tracked_query = track_claude_code(mock_query, project_name="claude-code-demo") + + # Use the tracked function + async for message in tracked_query("Create a simple Python function"): + if hasattr(message, "content"): + for block in message.content: + if hasattr(block, "text"): + print(f"Claude: {block.text}") + elif hasattr(message, "total_cost_usd"): + print(f"Cost: ${message.total_cost_usd:.4f}") + + print("✓ Basic example completed - check Opik UI for traces\n") + + +async def advanced_example(): + """Advanced example with custom options and tracking.""" + print("=== Advanced Claude Code Tracking Example ===") + + # Track with custom configuration + tracked_query = track_claude_code_with_options( + mock_query, + project_name="claude-code-advanced", + span_name="code_generation", + tags=["code", "python", "assistant"], + metadata={"version": "1.0", "model": "claude-3"}, + ) + + # Use with Claude Code options + options = MockClaudeCodeOptions( + system_prompt="You are an expert Python developer. Always include docstrings.", + allowed_tools=["Read", "Write"], + max_turns=3, + ) + + async for message in tracked_query( + "Create a function to calculate the factorial of a number", options=options + ): + if hasattr(message, "content"): + for block in message.content: + if hasattr(block, "text"): + print(f"Claude: {block.text}") + elif hasattr(message, "total_cost_usd"): + print(f"Cost: ${message.total_cost_usd:.4f}") + + print("✓ Advanced example completed - check Opik UI for detailed traces\n") + + +@opik.track +async def nested_tracking_example(): + """Example showing nested tracking with @opik.track decorator.""" + print("=== Nested Tracking Example ===") + + tracked_query = track_claude_code(mock_query) + + # This will create a parent span for the entire function + results = [] + async for message in tracked_query("Generate a simple web scraper"): + results.append(message) + + return {"message_count": len(results), "status": "completed"} + + +async def error_handling_example(): + """Example showing error handling.""" + print("=== Error Handling Example ===") + + # Mock function that raises an error + async def mock_error_query(prompt: str, options=None): + yield MockAssistantMessage([MockTextBlock("Starting task...")]) + raise ValueError("Mock error for demonstration") + + tracked_query = track_claude_code(mock_error_query) + + try: + async for message in tracked_query("This will fail"): + print(f"Received: {message}") + except ValueError as e: + print(f"Caught expected error: {e}") + print("✓ Error was properly tracked in Opik\n") + + +async def main(): + """Run all examples.""" + print("Claude Code + Opik Integration Examples") + print("=" * 50) + + try: + await basic_example() + await advanced_example() + + result = await nested_tracking_example() + print(f"✓ Nested tracking completed: {result}\n") + + await error_handling_example() + + print("All examples completed successfully!") + print("Check your Opik dashboard to see the traced calls.") + + except Exception as e: + print(f"Error running examples: {e}") + print("Make sure you have configured Opik properly with: opik.configure()") + + +if __name__ == "__main__": + # Run the examples + asyncio.run(main()) diff --git a/sdks/python/examples/test_claude_code_real.py b/sdks/python/examples/test_claude_code_real.py new file mode 100644 index 0000000000..b350d6bbc9 --- /dev/null +++ b/sdks/python/examples/test_claude_code_real.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +Test script demonstrating the Claude Code integration with a real-like structure. + +This simulates the actual usage you showed in your test file to verify +that the integration properly handles the conversation flow. +""" + +import asyncio +from typing import AsyncGenerator, Any, List + + +# Mock the actual claude_code_sdk classes to match your real usage +class TextBlock: + def __init__(self, text: str): + self.text = text + + +class AssistantMessage: + def __init__(self, content: List[TextBlock]): + self.content = content + + +class ResultMessage: + def __init__(self, total_cost_usd: float = 0.0): + self.total_cost_usd = total_cost_usd + + +class SystemMessage: + def __init__(self, content: str): + self.content = content + + +class UserMessage: + def __init__(self, content: str): + self.content = content + + +# Mock Claude Code query function that simulates your real usage +async def mock_query(prompt: str, options=None) -> AsyncGenerator[Any, None]: + """Mock function that simulates the real Claude Code conversation flow you showed.""" + + # Simulate the complex conversation flow you showed + yield SystemMessage("System initialized") + + yield AssistantMessage( + [ + TextBlock( + "I'll create a file called `hello.txt` with the content 'Hello, World!' for you." + ) + ] + ) + + yield AssistantMessage( + [ + TextBlock( + "Let me check the current directory first to create the file with an absolute path:" + ) + ] + ) + + yield UserMessage("pwd command executed") + + yield AssistantMessage( + [TextBlock("Now I'll create the file with the absolute path:")] + ) + + yield UserMessage("write file command executed") + + yield AssistantMessage( + [ + TextBlock( + "Since this is a new file, let me try reading it first (even though it doesn't exist yet) and then create it:" + ) + ] + ) + + yield UserMessage("read file command executed") + + yield AssistantMessage( + [ + TextBlock( + "It appears the file already exists with the content 'Hello, World!' in it. The file has been successfully created with the requested content." + ) + ] + ) + + yield ResultMessage(total_cost_usd=0.0234) + + +async def test_claude_code_integration(): + """Test the Claude Code integration.""" + print("Testing Claude Code integration...") + + # Import the integration (this would normally be from claude_code_sdk import query) + from opik.integrations.claude_code import track_claude_code + + # Track the query function + tracked_query = track_claude_code(mock_query) + + print("\nStarting Claude Code conversation...") + messages = [] + async for message in tracked_query( + "Create a file called hello.txt with 'Hello, World!' in it" + ): + messages.append(message) + print(f"message {type(message).__name__}") + + if isinstance(message, AssistantMessage): + for block in message.content: + if isinstance(block, TextBlock): + print(f"Claude: {block.text}") + elif isinstance(message, ResultMessage) and message.total_cost_usd > 0: + print(f"\nCost: ${message.total_cost_usd:.4f}") + + print(f"\nProcessed {len(messages)} messages total") + + # Flush to ensure all traces are sent + import opik + + client = opik.Opik() + client.flush() + + print("✅ Integration test completed! Check the Opik UI for the trace.") + print("🔍 You should see:") + print(" - A main 'claude_code_query' trace") + print(" - Structured conversation flow in the output") + print(" - Individual assistant messages clearly readable") + print(" - Cost information and conversation summary") + + +if __name__ == "__main__": + # Configure Opik (assumes you have OPIK_API_KEY set) + import opik + + try: + opik.configure() + asyncio.run(test_claude_code_integration()) + except Exception as e: + print(f"❌ Test failed: {e}") + print("💡 Make sure you have configured Opik (run `opik configure`)") diff --git a/sdks/python/src/opik/integrations/claude_code/__init__.py b/sdks/python/src/opik/integrations/claude_code/__init__.py new file mode 100644 index 0000000000..67929cc7a9 --- /dev/null +++ b/sdks/python/src/opik/integrations/claude_code/__init__.py @@ -0,0 +1,10 @@ +""" +Opik integration for Claude Code Python SDK. + +This module provides integration with Anthropic's Claude Code SDK, +allowing automatic tracking of Claude Code API calls as Opik spans and traces. +""" + +from .opik_tracker import track_claude_code, track_claude_code_with_options + +__all__ = ["track_claude_code", "track_claude_code_with_options"] diff --git a/sdks/python/src/opik/integrations/claude_code/opik_tracker.py b/sdks/python/src/opik/integrations/claude_code/opik_tracker.py new file mode 100644 index 0000000000..ce6d092819 --- /dev/null +++ b/sdks/python/src/opik/integrations/claude_code/opik_tracker.py @@ -0,0 +1,677 @@ +from typing import Optional, Callable, Any, AsyncGenerator +import logging + +from opik.decorator.tracker import track +from opik.types import LLMProvider +import opik.opik_context as opik_context +from opik.api_objects import opik_client +from opik.decorator import tracing_runtime_config +import opik.context_storage as context_storage +import opik.datetime_helpers as datetime_helpers + +LOGGER = logging.getLogger(__name__) + + +def track_claude_code( + query_func: Callable, + project_name: Optional[str] = None, +) -> Callable: + """Adds Opik tracking to a Claude Code SDK query function. + + This function wraps the Claude Code SDK `query` function to automatically + track all calls as Opik spans and traces. It creates: + + - A parent span for the entire query conversation + - Child spans for each assistant response in the conversation + - Child spans for system messages, user messages, and results + - Captures input prompts and options + - Tracks cost information from result messages + - Provides clear visibility into conversation flow + + Args: + query_func: The Claude Code SDK query function to track. + Typically `claude_code_sdk.query`. + project_name: The name of the project to log data to. + If not provided, uses the default project. + + Returns: + A wrapped version of the query function that automatically tracks calls. + + Example: + ```python + import opik + from claude_code_sdk import query + from opik.integrations.claude_code import track_claude_code + + # Configure Opik + opik.configure() + + # Track the query function + tracked_query = track_claude_code(query) + + # Use normally - tracking is automatic + async for message in tracked_query("What is Python?"): + if isinstance(message, AssistantMessage): + for block in message.content: + if isinstance(block, TextBlock): + print(f"Claude: {block.text}") + ``` + """ + + @track( + name="claude_code_query", + project_name=project_name, + tags=["claude_code", "conversation"], + ) + async def wrapper(*args: Any, **kwargs: Any) -> AsyncGenerator[Any, None]: + # Get the current span (created by @track decorator) + current_span = opik_context.get_current_span_data() + if current_span is None: + # Fallback to just call the function if no span context + async for message in query_func(*args, **kwargs): + yield message + return + + # Update the span with metadata + metadata = { + "provider": LLMProvider.ANTHROPIC, + "function_name": query_func.__name__, + "created_from": "claude_code", + } + + current_span.update(metadata=metadata) + + # Initialize tracking variables + assistant_message_count = 0 + system_message_count = 0 + user_message_count = 0 + other_message_count = 0 + messages = [] # Store messages to look for tool use/result pairs + + try: + # Iterate through the original async generator + async for message in query_func(*args, **kwargs): + messages.append(message) + message_type = type(message).__name__ + + # Look for tool use + tool result pairs in the last two messages + if len(messages) >= 2: + prev_message = messages[-2] + curr_message = messages[-1] + + # Check if we have a tool use followed by a tool result + if (_is_assistant_message(prev_message) and + _is_user_message(curr_message)): + + # Check if prev contains tool use and curr contains tool result + tool_use_blocks = _extract_tool_use_blocks(prev_message) + tool_result_blocks = _extract_tool_result_blocks(curr_message) + + if tool_use_blocks and tool_result_blocks: + # Match tool use with tool result by ID + for tool_block in tool_use_blocks: + for tool_result_block in tool_result_blocks: + if tool_block.id == tool_result_block.tool_use_id: + assistant_message_count += 1 + _create_single_tool_span( + current_span, prev_message, curr_message, + tool_block, tool_result_block, assistant_message_count + ) + break + + # Yield both messages after processing + yield message + continue + + # Handle non-tool messages + if _is_assistant_message(message): + # Check if it's a tool use message (will be handled above if paired) + tool_use_blocks = _extract_tool_use_blocks(message) + if not tool_use_blocks: # Only create span for non-tool assistant messages + assistant_message_count += 1 + _create_and_log_assistant_span( + current_span, message, assistant_message_count + ) + + elif _is_system_message(message): + system_message_count += 1 + _create_and_log_system_span( + current_span, message, system_message_count + ) + + elif _is_user_message(message): + # Check if it contains tool results (will be handled above if paired) + tool_result_blocks = _extract_tool_result_blocks(message) + if not tool_result_blocks: # Only create span for non-tool user messages + user_message_count += 1 + _create_and_log_user_span(current_span, message, user_message_count) + + elif _is_result_message(message): + _create_and_log_result_span(current_span, message) + + else: + other_message_count += 1 + _create_and_log_other_span( + current_span, message, other_message_count + ) + + # Yield the message to maintain the original API + yield message + + except Exception as e: + LOGGER.error(f"Error in Claude Code query: {e}") + current_span.update(output={"error": str(e)}, metadata={"error": True}) + raise + + return wrapper + + +def track_claude_code_with_options( + query_func: Callable[..., Any], + project_name: Optional[str] = None, + **decorator_kwargs: Any, +) -> Callable[..., Any]: + """Advanced Claude Code tracking with additional configuration options. + + This function provides more control over the tracking behavior with + additional configuration options for the decorator. + + Args: + query_func: The Claude Code SDK query function to track. + project_name: The name of the project to log data to. + **decorator_kwargs: Additional arguments passed to the decorator. + + Returns: + A wrapped version of the query function with enhanced tracking. + + Example: + ```python + import opik + from claude_code_sdk import query + from opik.integrations.claude_code import track_claude_code_with_options + + # Configure Opik + opik.configure() + + # Track with custom options + tracked_query = track_claude_code_with_options( + query, + project_name="my_claude_project", + ) + + # Use normally + async for message in tracked_query("Explain quantum computing"): + # Handle messages... + pass + ``` + """ + # Use the same implementation but allow for future customization + return track_claude_code(query_func, project_name=project_name) + + +def _extract_inputs(args: tuple, kwargs: dict) -> dict: + """Extract and format inputs for span creation.""" + result: dict = {} + + # Extract prompt (first positional argument or from kwargs) + if args: + result["prompt"] = args[0] + elif "prompt" in kwargs: + result["prompt"] = kwargs["prompt"] + + # Extract options if present + if len(args) > 1: + options = args[1] + if options is not None: + result["options"] = _extract_options_dict(options) + elif "options" in kwargs and kwargs["options"] is not None: + result["options"] = _extract_options_dict(kwargs["options"]) + + return result + + +def _extract_options_dict(options: Any) -> dict: + """Extract options object as dictionary for logging.""" + try: + options_dict: dict = {} + if hasattr(options, "system_prompt") and options.system_prompt: + options_dict["system_prompt"] = options.system_prompt + if hasattr(options, "allowed_tools") and options.allowed_tools: + options_dict["allowed_tools"] = list(options.allowed_tools) + if hasattr(options, "max_turns") and options.max_turns is not None: + options_dict["max_turns"] = options.max_turns + if hasattr(options, "add_dirs") and options.add_dirs: + options_dict["add_dirs"] = list(options.add_dirs) + if ( + hasattr(options, "continue_conversation") + and options.continue_conversation is not None + ): + options_dict["continue_conversation"] = options.continue_conversation + if hasattr(options, "disallowed_tools") and options.disallowed_tools: + options_dict["disallowed_tools"] = list(options.disallowed_tools) + if hasattr(options, "extra_args") and options.extra_args: + options_dict["extra_args"] = list(options.extra_args) + if ( + hasattr(options, "max_thinking_tokens") + and options.max_thinking_tokens is not None + ): + options_dict["max_thinking_tokens"] = options.max_thinking_tokens + if hasattr(options, "mcp_servers") and options.mcp_servers: + options_dict["mcp_servers"] = list(options.mcp_servers) + return options_dict + except Exception as e: + LOGGER.warning(f"Failed to extract options: {e}") + return {"options": str(options)} + + +def _is_assistant_message(message: Any) -> bool: + """Check if message is an AssistantMessage.""" + return hasattr(message, "content") and hasattr(message.content, "__iter__") + + +def _is_system_message(message: Any) -> bool: + """Check if message is a SystemMessage.""" + return type(message).__name__ == "SystemMessage" + + +def _is_user_message(message: Any) -> bool: + """Check if message is a UserMessage.""" + return type(message).__name__ == "UserMessage" + + +def _is_result_message(message: Any) -> bool: + """Check if message is a ResultMessage.""" + return hasattr(message, "total_cost_usd") + + +def _extract_tool_use_blocks(message: Any) -> list: + """Extract tool use blocks from an assistant message.""" + tool_use_blocks = [] + if hasattr(message, "content") and hasattr(message.content, "__iter__"): + for block in message.content: + if hasattr(block, "id") and hasattr(block, "name") and hasattr(block, "input"): + # This is a tool use block + tool_use_blocks.append(block) + return tool_use_blocks + + +def _extract_tool_result_blocks(message: Any) -> list: + """Extract tool result blocks from a user message.""" + tool_result_blocks = [] + if hasattr(message, "content") and hasattr(message.content, "__iter__"): + for block in message.content: + if hasattr(block, "tool_use_id") and hasattr(block, "content"): + # This is a tool result block + tool_result_blocks.append(block) + return tool_result_blocks + + +def _create_single_tool_span( + parent_span_data: Any, + tool_use_message: Any, + tool_result_message: Any, + tool_block: Any, + tool_result_block: Any, + count: int +) -> None: + """Create a single span that contains both tool input and output.""" + + # Convert messages to dict for proper serialization + tool_use_dict = _dataclass_to_dict(tool_use_message) + tool_result_dict = _dataclass_to_dict(tool_result_message) + + # Create input data structure + input_data = { + "message_data": tool_use_dict, + "text_content": None, + } + + # Create output data structure + output_data = { + "message_data": tool_result_dict, + "text_content": None, + } + + # Create the span + tool_span_data = parent_span_data.create_child_span_data( + name=f"assistant_response_{count}", + type="tool", + input=input_data, + output=output_data, + metadata={ + "provider": LLMProvider.ANTHROPIC, + "tool_id": tool_block.id, + "tool_name": tool_block.name, + "message_type": "ToolUse", + "response_number": count, + "is_error": getattr(tool_result_block, "is_error", False), + }, + tags=["claude_code", "tool_use", tool_block.name], + ) + + # Set end time and log the span immediately + tool_span_data.end_time = datetime_helpers.local_timestamp() + context_storage.add_span_data(tool_span_data) + + if tracing_runtime_config.is_tracing_active(): + client = opik_client.get_client_cached() + client.span(**tool_span_data.as_parameters) + + +def _create_tool_use_span(parent_span_data: Any, tool_block: Any, message: Any, count: int) -> Any: + """Create a tool use span but don't log it yet - wait for the result.""" + # Convert full message to dict for proper serialization + message_dict: dict = _dataclass_to_dict(message) + + # Create child span data for the tool use with complete message data as input + tool_span_data = parent_span_data.create_child_span_data( + name=f"assistant_response_{count}", + type="tool", + input={ + "message_data": message_dict, + "text_content": None, + }, + output={}, # Will be filled when we get the tool result + metadata={ + "provider": LLMProvider.ANTHROPIC, + "tool_id": tool_block.id, + "tool_name": tool_block.name, + "message_type": "ToolUse", + "response_number": count, + }, + tags=["claude_code", "tool_use", tool_block.name], + ) + + # Add some debugging + print(f"DEBUG: _create_tool_use_span - Created span assistant_response_{count} for tool {tool_block.name} with ID {tool_block.id}") + + return tool_span_data + + +def _complete_tool_span(tool_span_data: Any, tool_result_block: Any) -> None: + """Complete the tool span with the tool result and log it.""" + # Create a tool result message structure similar to what we'd see in the user message + tool_result_message_data = { + "content": [ + { + "tool_use_id": tool_result_block.tool_use_id, + "content": tool_result_block.content, + "is_error": getattr(tool_result_block, "is_error", False), + } + ] + } + + # Update the span with the tool result output + tool_span_data.output = { + "message_data": tool_result_message_data, + "text_content": None, + } + + # Update metadata + if hasattr(tool_result_block, "is_error") and tool_result_block.is_error: + tool_span_data.metadata["is_error"] = True + tool_span_data.metadata["error_message"] = str(tool_result_block.content) + + # Add debugging + print(f"DEBUG: _complete_tool_span - Completing tool span with tool_use_id {tool_result_block.tool_use_id}") + + # Log the complete tool span + _log_child_span(tool_span_data) + print(f"DEBUG: _complete_tool_span - Tool span logged successfully") + + +def _create_and_log_assistant_span( + parent_span_data: Any, message: Any, count: int +) -> None: + """Create and log a child span for an AssistantMessage.""" + # Convert dataclass to dict for proper serialization + message_dict: dict = _dataclass_to_dict(message) + + # Extract text content for readability (but don't filter based on it) + text_content: list = [] + try: + if hasattr(message, "content"): + for block in message.content: + if hasattr(block, "text") and block.text.strip(): + text_content.append(block.text.strip()) + except Exception as e: + LOGGER.debug(f"Could not extract text content from assistant message: {e}") + + # Create child span data + child_span_data = parent_span_data.create_child_span_data( + name=f"assistant_response_{count}", + type="llm", + input={"message_type": "assistant"}, + output={ + "message_data": message_dict, + "text_content": text_content if text_content else None, + }, + metadata={ + "provider": LLMProvider.ANTHROPIC, + "message_type": "AssistantMessage", + "response_number": count, + "has_text_content": bool(text_content), + }, + tags=["claude_code", "assistant_response"], + ) + + _log_child_span(child_span_data) + + +def _create_and_log_system_span( + parent_span_data: Any, message: Any, count: int +) -> None: + """Create and log a child span for a SystemMessage.""" + # Convert dataclass to dict for proper serialization + message_dict: dict = _dataclass_to_dict(message) + + child_span_data = parent_span_data.create_child_span_data( + name=f"system_message_{count}", + type="general", + input={"message_type": "system"}, + output={"message_data": message_dict}, + metadata={ + "provider": LLMProvider.ANTHROPIC, + "message_type": "SystemMessage", + "message_number": count, + }, + tags=["claude_code", "system_message"], + ) + + _log_child_span(child_span_data) + + +def _create_and_log_user_span(parent_span_data: Any, message: Any, count: int) -> None: + """Create and log a child span for a UserMessage.""" + # Convert dataclass to dict for proper serialization + message_dict: dict = _dataclass_to_dict(message) + + child_span_data = parent_span_data.create_child_span_data( + name=f"user_message_{count}", + type="general", + input={"message_type": "user"}, + output={"message_data": message_dict}, + metadata={ + "provider": LLMProvider.ANTHROPIC, + "message_type": "UserMessage", + "message_number": count, + }, + tags=["claude_code", "user_message"], + ) + + _log_child_span(child_span_data) + + +def _create_and_log_result_span(parent_span_data: Any, message: Any) -> None: + """Create and log a child span for a ResultMessage.""" + # Convert dataclass to dict for proper serialization + message_dict: dict = _dataclass_to_dict(message) + + child_span_data = parent_span_data.create_child_span_data( + name="conversation_result", + type="general", + input={"message_type": "result"}, + output={"message_data": message_dict}, + metadata={ + "provider": LLMProvider.ANTHROPIC, + "message_type": "ResultMessage", + }, + tags=["claude_code", "result_message"], + ) + + _log_child_span(child_span_data) + + +def _create_and_log_other_span(parent_span_data: Any, message: Any, count: int) -> None: + """Create and log a child span for other message types.""" + message_type: str = type(message).__name__ + # Convert dataclass to dict for proper serialization + message_dict: dict = _dataclass_to_dict(message) + + child_span_data = parent_span_data.create_child_span_data( + name=f"{message_type.lower()}_{count}", + type="general", + input={"message_type": message_type.lower()}, + output={"message_data": message_dict}, + metadata={ + "provider": LLMProvider.ANTHROPIC, + "message_type": message_type, + "message_number": count, + }, + tags=["claude_code", "other_message"], + ) + + _log_child_span(child_span_data) + + +def _dataclass_to_dict(obj: Any) -> dict: + """Convert a dataclass to a dictionary for proper serialization.""" + try: + # Try to use asdict if it's a dataclass + from dataclasses import asdict + + return asdict(obj) + except (TypeError, AttributeError): + # Fallback to converting to dict manually + try: + return { + field: getattr(obj, field) for field in obj.__dataclass_fields__.keys() + } + except AttributeError: + # If not a dataclass, convert to string as fallback + return {"content": str(obj)} + + +def _log_child_span(child_span_data: Any) -> None: + """Helper function to log a child span.""" + # Set end time to mark span as completed + import opik.datetime_helpers as datetime_helpers + + child_span_data.end_time = datetime_helpers.local_timestamp() + + # Add to context and log the span + context_storage.add_span_data(child_span_data) + + # Log the complete span + client = opik_client.get_client_cached() + if tracing_runtime_config.is_tracing_active(): + client.span(**child_span_data.as_parameters) + + +def _finalize_parent_span( + parent_span: Any, messages: list, total_cost: float, counts: dict +) -> None: + """Update the parent span with final conversation summary.""" + + # Create output summary + output: dict = { + "conversation_summary": { + "total_messages": len(messages), + **counts, + }, + "total_cost_usd": total_cost, + } + + # Add final assistant response if available + last_assistant_response: Optional[str] = None + for msg in reversed(messages): + if _is_assistant_message(msg): + try: + text_parts: list = [] + for block in msg.content: + if hasattr(block, "text"): + text_parts.append(block.text) + if text_parts: + last_assistant_response = " ".join(text_parts) + break + except Exception: + pass + + if last_assistant_response: + output["final_response"] = last_assistant_response + + # Update parent span + parent_span.update( + output=output, + metadata={ + "provider": LLMProvider.ANTHROPIC, + "total_cost_usd": total_cost, + "message_count": len(messages), + **counts, + }, + ) + + +def _process_generator_items(generator_items: list) -> dict: + """Process the collected generator items and structure them for readability.""" + all_messages: list = [] + conversation_total_cost: float = 0.0 + + for item in generator_items: + try: + # Convert all messages to dict format + message_dict = _dataclass_to_dict(item) + message_type = type(item).__name__ + + # Extract additional info based on message type + message_info: dict = { + "type": message_type.lower(), + "message_data": message_dict, + } + + # Add type-specific information + if hasattr(item, "content") and hasattr(item.content, "__iter__"): + # AssistantMessage - extract text content for readability + text_content: list = [] + for block in item.content: + if hasattr(block, "text") and block.text.strip(): + text_content.append(block.text.strip()) + if text_content: + message_info["text_content"] = text_content + message_info["readable_content"] = " ".join(text_content) + elif hasattr(item, "total_cost_usd"): + # ResultMessage + cost = getattr(item, "total_cost_usd", 0.0) + if isinstance(cost, (int, float)): + conversation_total_cost = float(cost) + message_info["cost_usd"] = float(cost) + + all_messages.append(message_info) + + except Exception as e: + LOGGER.warning(f"Failed to process generator item {type(item)}: {e}") + all_messages.append( + { + "type": "error", + "message_data": {"content": str(item)}, + "error": str(e), + } + ) + + return { + "conversation_flow": all_messages, + "result_info": { + "total_cost_usd": conversation_total_cost, + "total_messages_count": len(all_messages), + }, + } diff --git a/sdks/python/tests/library_integration/claude_code/__init__.py b/sdks/python/tests/library_integration/claude_code/__init__.py new file mode 100644 index 0000000000..bd77614ffd --- /dev/null +++ b/sdks/python/tests/library_integration/claude_code/__init__.py @@ -0,0 +1 @@ +# Claude Code integration tests diff --git a/sdks/python/tests/library_integration/claude_code/requirements.txt b/sdks/python/tests/library_integration/claude_code/requirements.txt new file mode 100644 index 0000000000..6180f94015 --- /dev/null +++ b/sdks/python/tests/library_integration/claude_code/requirements.txt @@ -0,0 +1,7 @@ +# Claude Code SDK requirements for testing +# Note: This file contains the dependencies needed to test the Claude Code integration +# Users will need to install claude-code-sdk separately + +# For testing purposes, we use mock objects, so no external dependencies are required +# When the actual Claude Code SDK is available, add it here: +# claude-code-sdk>=0.1.0 diff --git a/sdks/python/tests/library_integration/claude_code/test_claude_code_integration.py b/sdks/python/tests/library_integration/claude_code/test_claude_code_integration.py new file mode 100644 index 0000000000..59d0ef82aa --- /dev/null +++ b/sdks/python/tests/library_integration/claude_code/test_claude_code_integration.py @@ -0,0 +1,347 @@ +from typing import AsyncGenerator, List, Any +import asyncio + +from opik.integrations.claude_code import ( + track_claude_code, + track_claude_code_with_options, +) + + +# Mock Claude Code SDK classes for testing +class MockTextBlock: + def __init__(self, text: str): + self.text = text + + +class MockAssistantMessage: + def __init__(self, content: List[MockTextBlock]): + self.content = content + + +class MockResultMessage: + def __init__(self, total_cost_usd: float = 0.0): + self.total_cost_usd = total_cost_usd + + +class MockClaudeCodeOptions: + def __init__( + self, + system_prompt: str = None, + allowed_tools: List[str] = None, + max_turns: int = None, + add_dirs: List[str] = None, + continue_conversation: bool = None, + disallowed_tools: List[str] = None, + extra_args: List[str] = None, + max_thinking_tokens: int = None, + mcp_servers: List[str] = None, + ): + self.system_prompt = system_prompt + self.allowed_tools = allowed_tools or [] + self.max_turns = max_turns + self.add_dirs = add_dirs or [] + self.continue_conversation = continue_conversation + self.disallowed_tools = disallowed_tools or [] + self.extra_args = extra_args or [] + self.max_thinking_tokens = max_thinking_tokens + self.mcp_servers = mcp_servers or [] + + def __str__(self): + return f"ClaudeCodeOptions(system_prompt={self.system_prompt}, allowed_tools={self.allowed_tools}, max_turns={self.max_turns})" + + def __repr__(self): + return self.__str__() + + +# Mock Claude Code query functions +async def mock_query_basic(prompt: str, options=None) -> AsyncGenerator[Any, None]: + """Mock Claude Code query function - basic response.""" + yield MockAssistantMessage( + [MockTextBlock("This is a mocked assistant response for creating files.")] + ) + yield MockResultMessage(total_cost_usd=0.001) + + +async def mock_query_with_tools(prompt: str, options=None) -> AsyncGenerator[Any, None]: + """Mock Claude Code query function - with tools and multiple responses.""" + yield MockAssistantMessage([MockTextBlock("I'll help you create that file.")]) + yield MockAssistantMessage( + [MockTextBlock("File created successfully with the content 'Hello, World!'.")] + ) + yield MockResultMessage(total_cost_usd=0.002) + + +OPIK_E2E_TESTS_PROJECT_NAME = "Default Project" + + +def test_track_claude_code__basic_query__happyflow(fake_backend): + """Test basic Claude Code query tracking creates proper traces and spans.""" + tracked_query = track_claude_code(mock_query_basic) + + async def run_query(): + async for message in tracked_query("Create a file called hello.txt"): + pass # Just consume the messages + + # Execute the async function + asyncio.run(run_query()) + + # Flush the tracker to ensure all data is processed + from opik.decorator import tracker + + tracker.flush_tracker() + + # Should have 1 trace with multiple spans: + # - 1 parent span for the entire query + # - Child spans for each message (1 assistant + 1 result) + assert ( + len(fake_backend.trace_trees) == 1 + ), f"Expected 1 trace tree, got {len(fake_backend.trace_trees)}" + + trace = fake_backend.trace_trees[0] + + # Verify we got the expected messages in the conversation flow + conversation_flow = trace.output["conversation_flow"] + assert len(conversation_flow) == 2 + assert conversation_flow[0]["type"] == "mockassistantmessage" + assert conversation_flow[1]["type"] == "mockresultmessage" + + # Verify trace structure + assert trace.name == "claude_code_query" + assert trace.input == {"args": ["Create a file called hello.txt"], "kwargs": {}} + + # Should have conversation flow in output + assert "conversation_flow" in trace.output + assert trace.output["result_info"]["total_messages_count"] == 2 + assert trace.output["result_info"]["total_cost_usd"] == 0.001 + + # Should have 1 main span created by @track decorator + assert len(trace.spans) == 1, f"Expected 1 main span, got {len(trace.spans)}" + + main_span = trace.spans[0] + assert main_span.name == "claude_code_query" + + # The main span should have 2 child spans - one for assistant message, one for result + assert ( + len(main_span.spans) == 2 + ), f"Expected 2 child spans, got {len(main_span.spans)}" + + # Find assistant response span + assistant_spans = [ + span for span in main_span.spans if "assistant_response" in span.name + ] + assert ( + len(assistant_spans) == 1 + ), f"Expected 1 assistant response span, got {len(assistant_spans)}" + + assistant_span = assistant_spans[0] + assert assistant_span.name == "assistant_response_1" + assert "text_content" in assistant_span.output + assert assistant_span.output["text_content"] == [ + "This is a mocked assistant response for creating files." + ] + + # Find result span + result_spans = [span for span in main_span.spans if "result" in span.name] + assert len(result_spans) == 1, f"Expected 1 result span, got {len(result_spans)}" + + result_span = result_spans[0] + assert result_span.name == "conversation_result" + assert "message_data" in result_span.output + + +def test_track_claude_code__with_options__metadata_extracted(fake_backend): + """Test tracking with ClaudeCodeOptions and metadata extraction.""" + tracked_query = track_claude_code(mock_query_with_tools) + + async def run_query(): + options = MockClaudeCodeOptions( + system_prompt="You are a helpful coding assistant.", + allowed_tools=["Read", "Write"], + max_turns=3, + ) + + async for message in tracked_query( + "Create a file called hello.txt with 'Hello, World!' in it", options=options + ): + pass # Just consume the messages + + # Execute the async function + asyncio.run(run_query()) + + # Flush the tracker to ensure all data is processed + from opik.decorator import tracker + + tracker.flush_tracker() + + # Check trace structure + assert len(fake_backend.trace_trees) == 1 + trace = fake_backend.trace_trees[0] + + # Verify we got the expected messages in the conversation flow + conversation_flow = trace.output["conversation_flow"] + assert len(conversation_flow) == 3 # 2 assistant messages + 1 result + + # Verify input contains args and options + assert trace.input["args"] == [ + "Create a file called hello.txt with 'Hello, World!' in it" + ] + assert "kwargs" in trace.input + assert "options" in trace.input["kwargs"] + + # The @track decorator converts the options object to string for JSON serialization + options_str = trace.input["kwargs"]["options"] + assert isinstance(options_str, str) + assert "You are a helpful coding assistant." in options_str + assert "Read" in options_str + assert "Write" in options_str + assert "3" in options_str + + # Verify multiple assistant responses in conversation flow + assert trace.output["result_info"]["total_messages_count"] == 3 + assert trace.output["result_info"]["total_cost_usd"] == 0.002 + + # Should have 1 main span with child spans for assistant messages + assert len(trace.spans) == 1, f"Expected 1 main span, got {len(trace.spans)}" + main_span = trace.spans[0] + + # Should have multiple child spans for assistant messages (2 assistant + 1 result = 3 total) + assistant_spans = [ + span for span in main_span.spans if "assistant_response" in span.name + ] + assert ( + len(assistant_spans) == 2 + ), f"Expected 2 assistant response spans, got {len(assistant_spans)}" + + # Verify individual assistant spans + assert assistant_spans[0].name == "assistant_response_1" + assert assistant_spans[0].output["text_content"] == [ + "I'll help you create that file." + ] + + assert assistant_spans[1].name == "assistant_response_2" + assert assistant_spans[1].output["text_content"] == [ + "File created successfully with the content 'Hello, World!'." + ] + + # Final response should be in the conversation flow + conversation_flow = trace.output["conversation_flow"] + assert len(conversation_flow) == 3 # 2 assistant + 1 result + + +def test_track_claude_code_with_options__custom_project(fake_backend): + """Test advanced tracking with custom project name.""" + custom_project = "custom_claude_project" + tracked_query = track_claude_code_with_options( + mock_query_basic, project_name=custom_project + ) + + async def run_query(): + async for message in tracked_query("Simple test query"): + pass # Just consume the messages + + # Execute the async function + asyncio.run(run_query()) + + # Flush the tracker to ensure all data is processed + from opik.decorator import tracker + + tracker.flush_tracker() + + # Check that trace was created + assert len(fake_backend.trace_trees) == 1 + trace = fake_backend.trace_trees[0] + + # Verify basic structure + assert trace.name == "claude_code_query" + assert trace.input == {"args": ["Simple test query"], "kwargs": {}} + + # Note: Project name verification depends on how the fake_backend handles projects + # In real usage, this would be sent to the specified project + + +def test_track_claude_code__error_handling(fake_backend): + """Test error handling in tracked Claude Code queries.""" + + async def mock_query_error(prompt: str, options=None) -> AsyncGenerator[Any, None]: + """Mock query function that raises an error.""" + yield MockAssistantMessage([MockTextBlock("Starting to process...")]) + raise ValueError("Simulated API error") + + tracked_query = track_claude_code(mock_query_error) + + async def run_query(): + try: + async for message in tracked_query("This will fail"): + pass # Just consume the messages + except ValueError: + pass # Expected error + + # Execute the async function + asyncio.run(run_query()) + + # Flush the tracker to ensure all data is processed + from opik.decorator import tracker + + tracker.flush_tracker() + + # Should still create a trace even with error + assert len(fake_backend.trace_trees) == 1 + trace = fake_backend.trace_trees[0] + + # Error should be captured - check both output and error_info + has_error_in_output = trace.output is not None and "error" in trace.output + has_error_info = trace.error_info is not None + + # At least one should contain error information + assert ( + has_error_in_output or has_error_info + ), f"No error information found. Output: {trace.output}, Error info: {trace.error_info}" + + if has_error_in_output: + assert "Simulated API error" in trace.output["error"] + if has_error_info: + assert "Simulated API error" in str(trace.error_info) + + +def test_track_claude_code__no_messages(fake_backend): + """Test tracking when no messages are yielded.""" + + async def mock_query_empty(prompt: str, options=None) -> AsyncGenerator[Any, None]: + """Mock query function that yields no messages.""" + # Yield nothing + if False: + yield + + tracked_query = track_claude_code(mock_query_empty) + + async def run_query(): + async for message in tracked_query("Empty query"): + pass # Just consume the messages + + # Execute the async function + asyncio.run(run_query()) + + # Flush the tracker to ensure all data is processed + from opik.decorator import tracker + + tracker.flush_tracker() + + # Should still create a trace + assert len(fake_backend.trace_trees) == 1 + trace = fake_backend.trace_trees[0] + + # Should have empty conversation flow + assert trace.output["result_info"]["total_messages_count"] == 0 + assert trace.output["result_info"]["total_cost_usd"] == 0.0 + + # Should have empty conversation flow since no messages + assert len(trace.output["conversation_flow"]) == 0 + + # Should have 1 main span but no child spans since no assistant messages were yielded + assert len(trace.spans) == 1, f"Expected 1 main span, got {len(trace.spans)}" + main_span = trace.spans[0] + + # Main span should have no child spans since no messages were yielded at all + assert ( + len(main_span.spans) == 0 + ), f"Expected 0 child spans (no messages), got {len(main_span.spans)}"