From c598a200af6d0d547b6074e5eb5724a2ddca7521 Mon Sep 17 00:00:00 2001 From: Abhimanyu Siwach Date: Thu, 7 Aug 2025 13:35:06 -0700 Subject: [PATCH 1/2] chore: remove concurrency checks and simplify thread pool handling --- .gitignore | 4 + src/bedrock_agentcore/runtime/app.py | 28 ++-- tests/bedrock_agentcore/runtime/test_app.py | 165 +++----------------- 3 files changed, 40 insertions(+), 157 deletions(-) diff --git a/.gitignore b/.gitignore index 1330d9c..1b73304 100644 --- a/.gitignore +++ b/.gitignore @@ -223,3 +223,7 @@ config local_config.py local_settings.py .ruff_cache + +.bedrock_agentcore.yaml +.dockerignore +Dockerfile \ No newline at end of file diff --git a/src/bedrock_agentcore/runtime/app.py b/src/bedrock_agentcore/runtime/app.py index 00d8a91..e07fc87 100644 --- a/src/bedrock_agentcore/runtime/app.py +++ b/src/bedrock_agentcore/runtime/app.py @@ -11,7 +11,6 @@ import threading import time import uuid -from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, Dict, Optional from starlette.applications import Starlette @@ -62,8 +61,6 @@ def __init__(self, debug: bool = False): self._task_counter_lock: threading.Lock = threading.Lock() self._forced_ping_status: Optional[PingStatus] = None self._last_status_update_time: float = time.time() - self._invocation_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="invocation") - self._invocation_semaphore = asyncio.Semaphore(2) routes = [ Route("/invocations", self._handle_invocation, methods=["POST"]), @@ -346,21 +343,18 @@ def run(self, port: int = 8080, host: Optional[str] = None): uvicorn.run(self, host=host, port=port) async def _invoke_handler(self, handler, request_context, takes_context, payload): - if self._invocation_semaphore.locked(): - return JSONResponse({"error": "Server busy - maximum concurrent requests reached"}, status_code=503) + try: + args = (payload, request_context) if takes_context else (payload,) - async with self._invocation_semaphore: - try: - args = (payload, request_context) if takes_context else (payload,) - if asyncio.iscoroutinefunction(handler): - return await handler(*args) - else: - loop = asyncio.get_event_loop() - return await loop.run_in_executor(self._invocation_executor, handler, *args) - except Exception as e: - handler_name = getattr(handler, "__name__", "unknown") - self.logger.error("Handler '%s' execution failed: %s: %s", handler_name, type(e).__name__, e) - raise + if asyncio.iscoroutinefunction(handler): + return await handler(*args) + else: + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, handler, *args) + except Exception as e: + handler_name = getattr(handler, "__name__", "unknown") + self.logger.error("Handler '%s' execution failed: %s: %s", handler_name, type(e).__name__, e) + raise def _handle_task_action(self, payload: dict) -> Optional[JSONResponse]: """Handle task management actions if present in payload.""" diff --git a/tests/bedrock_agentcore/runtime/test_app.py b/tests/bedrock_agentcore/runtime/test_app.py index 9faf125..110dbdf 100644 --- a/tests/bedrock_agentcore/runtime/test_app.py +++ b/tests/bedrock_agentcore/runtime/test_app.py @@ -3,10 +3,9 @@ import os import threading import time -from concurrent.futures import ThreadPoolExecutor from datetime import datetime from decimal import Decimal -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import MagicMock, patch import pytest from starlette.testclient import TestClient @@ -235,25 +234,19 @@ def handler(payload): class TestConcurrentInvocations: - """Test concurrent invocation handling with thread pool and semaphore.""" + """Test concurrent invocation handling simplified without limits.""" - def test_thread_pool_initialization(self): - """Test ThreadPoolExecutor and Semaphore are properly initialized.""" + def test_simplified_initialization(self): + """Test that app initializes without thread pool and semaphore.""" app = BedrockAgentCoreApp() - # Check ThreadPoolExecutor is initialized with correct settings - assert hasattr(app, "_invocation_executor") - assert isinstance(app._invocation_executor, ThreadPoolExecutor) - assert app._invocation_executor._max_workers == 2 - - # Check Semaphore is initialized with correct limit - assert hasattr(app, "_invocation_semaphore") - assert isinstance(app._invocation_semaphore, asyncio.Semaphore) - assert app._invocation_semaphore._value == 2 + # Check ThreadPoolExecutor and Semaphore are NOT initialized + assert not hasattr(app, "_invocation_executor") + assert not hasattr(app, "_invocation_semaphore") @pytest.mark.asyncio - async def test_concurrent_invocations_within_limit(self): - """Test that 2 concurrent requests work fine.""" + async def test_concurrent_invocations_unlimited(self): + """Test that multiple concurrent requests work without limits.""" app = BedrockAgentCoreApp() # Create a slow sync handler @@ -262,66 +255,26 @@ def handler(payload): time.sleep(0.1) # Simulate work return {"id": payload["id"]} - # Mock the executor to track calls - original_executor = app._invocation_executor - mock_executor = Mock(wraps=original_executor) - app._invocation_executor = mock_executor - # Create request context from bedrock_agentcore.runtime.context import RequestContext context = RequestContext(session_id=None) - # Start 2 concurrent invocations + # Start 3+ concurrent invocations (no limit) task1 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 1})) task2 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 2})) + task3 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 3})) - # Both should complete successfully + # All should complete successfully result1 = await task1 result2 = await task2 + result3 = await task3 assert result1 == {"id": 1} assert result2 == {"id": 2} + assert result3 == {"id": 3} - # Verify executor was used for sync handlers - assert mock_executor.submit.call_count >= 2 - - @pytest.mark.asyncio - async def test_concurrent_invocations_exceed_limit(self): - """Test that 3rd concurrent request gets 503 response.""" - app = BedrockAgentCoreApp() - - # Create a slow handler - @app.entrypoint - def handler(payload): - time.sleep(0.5) # Simulate long work - return {"id": payload["id"]} - - # Create request context - from bedrock_agentcore.runtime.context import RequestContext - - context = RequestContext(session_id=None) - - # Start 2 invocations to fill the semaphore - task1 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 1})) - task2 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 2})) - - # Wait a bit to ensure they've acquired the semaphore - await asyncio.sleep(0.1) - - # Third invocation should get 503 - result3 = await app._invoke_handler(handler, context, False, {"id": 3}) - - # Verify it's a JSONResponse with 503 status - from starlette.responses import JSONResponse - - assert isinstance(result3, JSONResponse) - assert result3.status_code == 503 - assert result3.body == b'{"error":"Server busy - maximum concurrent requests reached"}' - - # Clean up the running tasks - await task1 - await task2 + # Removed: No more 503 responses since we removed concurrency limits @pytest.mark.asyncio async def test_async_handler_runs_in_event_loop(self): @@ -338,10 +291,6 @@ async def handler(payload): await asyncio.sleep(0.01) return {"async": True} - # Mock the executor to ensure it's NOT used for async handlers - mock_executor = Mock() - app._invocation_executor = mock_executor - # Create request context from bedrock_agentcore.runtime.context import RequestContext @@ -353,12 +302,11 @@ async def handler(payload): assert result == {"async": True} # Async handler should run in main thread assert handler_thread_id == threading.current_thread().ident - # Executor should NOT be used for async handlers - mock_executor.submit.assert_not_called() + # No executor needed for async handlers @pytest.mark.asyncio async def test_sync_handler_runs_in_thread_pool(self): - """Test sync handlers run in thread pool.""" + """Test sync handlers run in default executor, not main event loop.""" app = BedrockAgentCoreApp() # Track which thread the handler runs in @@ -379,36 +327,14 @@ def handler(payload): result = await app._invoke_handler(handler, context, False, {}) assert result == {"sync": True} - # Sync handler should NOT run in main thread + # Sync handler should NOT run in main thread (uses default executor) assert handler_thread_id != threading.current_thread().ident - @pytest.mark.asyncio - async def test_semaphore_release_after_completion(self): - """Test semaphore is properly released after request completion.""" - app = BedrockAgentCoreApp() - - @app.entrypoint - def handler(payload): - return {"result": "ok"} - - # Create request context - from bedrock_agentcore.runtime.context import RequestContext - - context = RequestContext(session_id=None) - - # Check initial semaphore value - assert app._invocation_semaphore._value == 2 - - # Make a request - result = await app._invoke_handler(handler, context, False, {}) - assert result == {"result": "ok"} - - # Semaphore should be released - assert app._invocation_semaphore._value == 2 + # Removed: No semaphore to test @pytest.mark.asyncio - async def test_handler_exception_releases_semaphore(self): - """Test semaphore is released even when handler fails.""" + async def test_handler_exception_propagates(self): + """Test handler exceptions are properly propagated.""" app = BedrockAgentCoreApp() @app.entrypoint @@ -420,16 +346,10 @@ def handler(payload): context = RequestContext(session_id=None) - # Check initial semaphore value - assert app._invocation_semaphore._value == 2 - - # Make a request that will fail + # Exception should propagate with pytest.raises(ValueError, match="Test error"): await app._invoke_handler(handler, context, False, {}) - # Semaphore should still be released - assert app._invocation_semaphore._value == 2 - def test_no_thread_leak_on_repeated_requests(self): """Test that repeated requests don't leak threads.""" app = BedrockAgentCoreApp() @@ -450,46 +370,11 @@ def handler(payload): assert response.json() == {"id": i} # Thread count should not have increased significantly - # Allow for some variance but no leak + # Allow for some variance but no leak (uses default executor) final_thread_count = threading.active_count() - assert final_thread_count <= initial_thread_count + 2 # Thread pool has max 2 threads - - @pytest.mark.asyncio - async def test_server_busy_error_format(self): - """Test 503 response has correct error message format.""" - app = BedrockAgentCoreApp() - - # Fill the semaphore - await app._invocation_semaphore.acquire() - await app._invocation_semaphore.acquire() - - @app.entrypoint - def handler(payload): - return {"ok": True} - - # Create request context - from bedrock_agentcore.runtime.context import RequestContext - - context = RequestContext(session_id=None) - - # Try to invoke when semaphore is full - result = await app._invoke_handler(handler, context, False, {}) - - # Check response format - from starlette.responses import JSONResponse - - assert isinstance(result, JSONResponse) - assert result.status_code == 503 - - # Parse the JSON body - import json - - body = json.loads(result.body) - assert body == {"error": "Server busy - maximum concurrent requests reached"} + assert final_thread_count <= initial_thread_count + 10 # Default executor may create more threads - # Release semaphore - app._invocation_semaphore.release() - app._invocation_semaphore.release() + # Removed: No more server busy errors def test_ping_endpoint_remains_sync(self): """Test that ping endpoint is not async.""" From 770029b236ea76e793f62b6181b25e67477487e6 Mon Sep 17 00:00:00 2001 From: Abhimanyu Siwach Date: Thu, 7 Aug 2025 13:38:55 -0700 Subject: [PATCH 2/2] fix gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 1b73304..b7c8535 100644 --- a/.gitignore +++ b/.gitignore @@ -226,4 +226,4 @@ local_settings.py .bedrock_agentcore.yaml .dockerignore -Dockerfile \ No newline at end of file +Dockerfile