Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,7 @@ config
local_config.py
local_settings.py
.ruff_cache

.bedrock_agentcore.yaml
.dockerignore
Dockerfile
28 changes: 11 additions & 17 deletions src/bedrock_agentcore/runtime/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, Optional

from starlette.applications import Starlette
Expand Down Expand Up @@ -62,8 +61,6 @@ def __init__(self, debug: bool = False):
self._task_counter_lock: threading.Lock = threading.Lock()
self._forced_ping_status: Optional[PingStatus] = None
self._last_status_update_time: float = time.time()
self._invocation_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="invocation")
self._invocation_semaphore = asyncio.Semaphore(2)

routes = [
Route("/invocations", self._handle_invocation, methods=["POST"]),
Expand Down Expand Up @@ -346,21 +343,18 @@ def run(self, port: int = 8080, host: Optional[str] = None):
uvicorn.run(self, host=host, port=port)

async def _invoke_handler(self, handler, request_context, takes_context, payload):
if self._invocation_semaphore.locked():
return JSONResponse({"error": "Server busy - maximum concurrent requests reached"}, status_code=503)
try:
args = (payload, request_context) if takes_context else (payload,)

async with self._invocation_semaphore:
try:
args = (payload, request_context) if takes_context else (payload,)
if asyncio.iscoroutinefunction(handler):
return await handler(*args)
else:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._invocation_executor, handler, *args)
except Exception as e:
handler_name = getattr(handler, "__name__", "unknown")
self.logger.error("Handler '%s' execution failed: %s: %s", handler_name, type(e).__name__, e)
raise
if asyncio.iscoroutinefunction(handler):
return await handler(*args)
else:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, handler, *args)
except Exception as e:
handler_name = getattr(handler, "__name__", "unknown")
self.logger.error("Handler '%s' execution failed: %s: %s", handler_name, type(e).__name__, e)
raise

def _handle_task_action(self, payload: dict) -> Optional[JSONResponse]:
"""Handle task management actions if present in payload."""
Expand Down
165 changes: 25 additions & 140 deletions tests/bedrock_agentcore/runtime/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from decimal import Decimal
from unittest.mock import MagicMock, Mock, patch
from unittest.mock import MagicMock, patch

import pytest
from starlette.testclient import TestClient
Expand Down Expand Up @@ -235,25 +234,19 @@ def handler(payload):


class TestConcurrentInvocations:
"""Test concurrent invocation handling with thread pool and semaphore."""
"""Test concurrent invocation handling simplified without limits."""

def test_thread_pool_initialization(self):
"""Test ThreadPoolExecutor and Semaphore are properly initialized."""
def test_simplified_initialization(self):
"""Test that app initializes without thread pool and semaphore."""
app = BedrockAgentCoreApp()

# Check ThreadPoolExecutor is initialized with correct settings
assert hasattr(app, "_invocation_executor")
assert isinstance(app._invocation_executor, ThreadPoolExecutor)
assert app._invocation_executor._max_workers == 2

# Check Semaphore is initialized with correct limit
assert hasattr(app, "_invocation_semaphore")
assert isinstance(app._invocation_semaphore, asyncio.Semaphore)
assert app._invocation_semaphore._value == 2
# Check ThreadPoolExecutor and Semaphore are NOT initialized
assert not hasattr(app, "_invocation_executor")
assert not hasattr(app, "_invocation_semaphore")

@pytest.mark.asyncio
async def test_concurrent_invocations_within_limit(self):
"""Test that 2 concurrent requests work fine."""
async def test_concurrent_invocations_unlimited(self):
"""Test that multiple concurrent requests work without limits."""
app = BedrockAgentCoreApp()

# Create a slow sync handler
Expand All @@ -262,66 +255,26 @@ def handler(payload):
time.sleep(0.1) # Simulate work
return {"id": payload["id"]}

# Mock the executor to track calls
original_executor = app._invocation_executor
mock_executor = Mock(wraps=original_executor)
app._invocation_executor = mock_executor

# Create request context
from bedrock_agentcore.runtime.context import RequestContext

context = RequestContext(session_id=None)

# Start 2 concurrent invocations
# Start 3+ concurrent invocations (no limit)
task1 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 1}))
task2 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 2}))
task3 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 3}))

# Both should complete successfully
# All should complete successfully
result1 = await task1
result2 = await task2
result3 = await task3

assert result1 == {"id": 1}
assert result2 == {"id": 2}
assert result3 == {"id": 3}

# Verify executor was used for sync handlers
assert mock_executor.submit.call_count >= 2

@pytest.mark.asyncio
async def test_concurrent_invocations_exceed_limit(self):
"""Test that 3rd concurrent request gets 503 response."""
app = BedrockAgentCoreApp()

# Create a slow handler
@app.entrypoint
def handler(payload):
time.sleep(0.5) # Simulate long work
return {"id": payload["id"]}

# Create request context
from bedrock_agentcore.runtime.context import RequestContext

context = RequestContext(session_id=None)

# Start 2 invocations to fill the semaphore
task1 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 1}))
task2 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 2}))

# Wait a bit to ensure they've acquired the semaphore
await asyncio.sleep(0.1)

# Third invocation should get 503
result3 = await app._invoke_handler(handler, context, False, {"id": 3})

# Verify it's a JSONResponse with 503 status
from starlette.responses import JSONResponse

assert isinstance(result3, JSONResponse)
assert result3.status_code == 503
assert result3.body == b'{"error":"Server busy - maximum concurrent requests reached"}'

# Clean up the running tasks
await task1
await task2
# Removed: No more 503 responses since we removed concurrency limits

@pytest.mark.asyncio
async def test_async_handler_runs_in_event_loop(self):
Expand All @@ -338,10 +291,6 @@ async def handler(payload):
await asyncio.sleep(0.01)
return {"async": True}

# Mock the executor to ensure it's NOT used for async handlers
mock_executor = Mock()
app._invocation_executor = mock_executor

# Create request context
from bedrock_agentcore.runtime.context import RequestContext

Expand All @@ -353,12 +302,11 @@ async def handler(payload):
assert result == {"async": True}
# Async handler should run in main thread
assert handler_thread_id == threading.current_thread().ident
# Executor should NOT be used for async handlers
mock_executor.submit.assert_not_called()
# No executor needed for async handlers

@pytest.mark.asyncio
async def test_sync_handler_runs_in_thread_pool(self):
"""Test sync handlers run in thread pool."""
"""Test sync handlers run in default executor, not main event loop."""
app = BedrockAgentCoreApp()

# Track which thread the handler runs in
Expand All @@ -379,36 +327,14 @@ def handler(payload):
result = await app._invoke_handler(handler, context, False, {})

assert result == {"sync": True}
# Sync handler should NOT run in main thread
# Sync handler should NOT run in main thread (uses default executor)
assert handler_thread_id != threading.current_thread().ident

@pytest.mark.asyncio
async def test_semaphore_release_after_completion(self):
"""Test semaphore is properly released after request completion."""
app = BedrockAgentCoreApp()

@app.entrypoint
def handler(payload):
return {"result": "ok"}

# Create request context
from bedrock_agentcore.runtime.context import RequestContext

context = RequestContext(session_id=None)

# Check initial semaphore value
assert app._invocation_semaphore._value == 2

# Make a request
result = await app._invoke_handler(handler, context, False, {})
assert result == {"result": "ok"}

# Semaphore should be released
assert app._invocation_semaphore._value == 2
# Removed: No semaphore to test

@pytest.mark.asyncio
async def test_handler_exception_releases_semaphore(self):
"""Test semaphore is released even when handler fails."""
async def test_handler_exception_propagates(self):
"""Test handler exceptions are properly propagated."""
app = BedrockAgentCoreApp()

@app.entrypoint
Expand All @@ -420,16 +346,10 @@ def handler(payload):

context = RequestContext(session_id=None)

# Check initial semaphore value
assert app._invocation_semaphore._value == 2

# Make a request that will fail
# Exception should propagate
with pytest.raises(ValueError, match="Test error"):
await app._invoke_handler(handler, context, False, {})

# Semaphore should still be released
assert app._invocation_semaphore._value == 2

def test_no_thread_leak_on_repeated_requests(self):
"""Test that repeated requests don't leak threads."""
app = BedrockAgentCoreApp()
Expand All @@ -450,46 +370,11 @@ def handler(payload):
assert response.json() == {"id": i}

# Thread count should not have increased significantly
# Allow for some variance but no leak
# Allow for some variance but no leak (uses default executor)
final_thread_count = threading.active_count()
assert final_thread_count <= initial_thread_count + 2 # Thread pool has max 2 threads

@pytest.mark.asyncio
async def test_server_busy_error_format(self):
"""Test 503 response has correct error message format."""
app = BedrockAgentCoreApp()

# Fill the semaphore
await app._invocation_semaphore.acquire()
await app._invocation_semaphore.acquire()

@app.entrypoint
def handler(payload):
return {"ok": True}

# Create request context
from bedrock_agentcore.runtime.context import RequestContext

context = RequestContext(session_id=None)

# Try to invoke when semaphore is full
result = await app._invoke_handler(handler, context, False, {})

# Check response format
from starlette.responses import JSONResponse

assert isinstance(result, JSONResponse)
assert result.status_code == 503

# Parse the JSON body
import json

body = json.loads(result.body)
assert body == {"error": "Server busy - maximum concurrent requests reached"}
assert final_thread_count <= initial_thread_count + 10 # Default executor may create more threads

# Release semaphore
app._invocation_semaphore.release()
app._invocation_semaphore.release()
# Removed: No more server busy errors

def test_ping_endpoint_remains_sync(self):
"""Test that ping endpoint is not async."""
Expand Down
Loading