Skip to content

Conversation

ovidiutaralesca
Copy link
Contributor

Issue

On Python 3.13, the closed-queue signal is asyncio.QueueShutDown. event_consumer.py aliased asyncio.QueueShutDown to ClosedQueue, but asyncio.QueueEmpty can still arise on py3.13 and it's not handled properly in consume_all()'s except blocks.

How it's reproduced

In any AgentExecutor class:

# (...)
async def execute(
        self,
        request: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        await event_queue.enqueue_event(
            new_task(request.message)
        )

In python < 3.13:

  • Sending a message/send request works, a task is added to the InMemoryTaskStore.

In python >= 3.13:

  • Sending a message/send request crashes with exception:
File "venv/lib/python3.13/site-packages/a2a/server/apps/jsonrpc/jsonrpc_app.py", line 219, in _handle_requests
    return await self._process_non_streaming_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        request_id, a2a_request, call_context
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "venv/lib/python3.13/site-packages/a2a/server/apps/jsonrpc/jsonrpc_app.py", line 306, in _process_non_streaming_request
    handler_result = await self.handler.on_message_send(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        request_obj, context
        ^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib/python3.13/site-packages/a2a/server/request_handlers/jsonrpc_handler.py", line 87, in on_message_send
    task_or_message = await self.request_handler.on_message_send(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        request.params, context
        ^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 282, in on_message_send
    ) = await result_aggregator.consume_and_break_on_interrupt(consumer)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 115, in consume_and_break_on_interrupt
    async for event in event_stream:
    ...<20 lines>...
            break
  File "venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 87, in consume_all
    raise self._exception
  File "venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 94, in consume_all
    event = await asyncio.wait_for(
            ^^^^^^^^^^^^^^^^^^^^^^^
        self.queue.dequeue_event(), timeout=self._timeout
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/opt/homebrew/Cellar/[email protected]/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
    return await fut
           ^^^^^^^^^
  File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib/python3.13/site-packages/a2a/server/events/event_queue.py", line 95, in dequeue_event
    raise asyncio.QueueEmpty('Queue is closed.')
asyncio.queues.QueueEmpty: Queue is closed.

Fix

Code

  • event_consumer.consume_all:

    • Catch (QueueClosed, asyncio.QueueEmpty) and break only when queue.is_closed() is True; otherwise continue polling.
  • event_queue.dequeue_event:

    • Version-guard the early-raise: on <3.13 keep raising QueueEmpty when closed+empty; on ≥3.13 skip the early raise and rely on queue.shutdown() to surface QueueShutDown exceptions.

Tests

Added 2 tests which fail on current implementation, but pass after the fix.

@ovidiutaralesca ovidiutaralesca requested a review from a team as a code owner August 14, 2025 15:18
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @ovidiutaralesca, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical issue where the event consumer would crash on Python 3.13 due to changes in how asyncio queues signal closure. The fix involves updating the event_consumer to correctly handle asyncio.QueueEmpty exceptions when the queue is truly closed, and adjusting the event_queue to align its dequeue_event behavior with Python 3.13's asyncio.QueueShutDown signal. These changes ensure the event processing remains stable and reliable across different Python versions, preventing unexpected crashes during event consumption.

Highlights

  • Python 3.13 Compatibility: The core issue addressed is a crash in the event_consumer on Python 3.13, caused by asyncio.QueueEmpty being raised when the queue is closed, which was not previously handled.
  • Robust Exception Handling: The consume_all method in event_consumer.py now explicitly catches asyncio.QueueEmpty alongside QueueClosed, ensuring that the consumer correctly identifies a truly closed queue before breaking the event consumption loop.
  • Version-Specific Queue Behavior: A version check (sys.version_info < (3, 13)) was added to event_queue.dequeue_event to differentiate behavior for Python 3.13+, preventing an early QueueEmpty raise and allowing asyncio.QueueShutDown to signal queue closure as intended by the newer Python version.
  • Enhanced Test Coverage: Two new asynchronous tests were added to test_event_consumer.py to validate the fix, covering scenarios where asyncio.QueueEmpty is raised on both closed and open queues, ensuring the consumer behaves as expected.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a compatibility issue with Python 3.13's asyncio.Queue behavior. The changes are well-implemented and correctly handle the different exception types for closed queues across Python versions. The modification in event_consumer.py to catch both QueueClosed and asyncio.QueueEmpty is a robust fix, and the version-specific logic in event_queue.py is appropriate. The newly added tests are comprehensive and validate the fix effectively, covering scenarios for both temporarily empty and permanently closed queues. Overall, this is a solid contribution that improves compatibility and reliability.

@ovidiutaralesca ovidiutaralesca force-pushed the fix_python_313_event_consumer branch from 5eb03ff to dbc4a12 Compare August 15, 2025 06:03
@mikeas1
Copy link
Contributor

mikeas1 commented Aug 19, 2025

Thanks for this fix!

@mikeas1 mikeas1 enabled auto-merge (squash) August 19, 2025 15:28
auto-merge was automatically disabled August 20, 2025 07:55

Head branch was pushed to by a user without write access

@ovidiutaralesca ovidiutaralesca force-pushed the fix_python_313_event_consumer branch from 7a95324 to 7b6d0a2 Compare August 20, 2025 08:00
@ovidiutaralesca
Copy link
Contributor Author

@mikeas1 thank you for the review!

I had to make a small adjustment to an existing test to work on py3.13.
I need a re-approval.

@holtskinner holtskinner changed the title fix: make event_consumer tolerant to closed queues on py3.13 fix: make event_consumer tolerant to closed queues on py3.13 Aug 20, 2025
@holtskinner holtskinner enabled auto-merge (squash) August 20, 2025 14:25
@holtskinner holtskinner merged commit a371461 into a2aproject:main Aug 20, 2025
5 checks passed
holtskinner pushed a commit that referenced this pull request Aug 20, 2025
🤖 I have created a release *beep* *boop*
---


##
[0.3.2](v0.3.1...v0.3.2)
(2025-08-20)


### Bug Fixes

* Add missing mime_type and name in proto conversion utils
([#408](#408))
([72b2ee7](72b2ee7))
* Add name field to FilePart protobuf message
([#403](#403))
([1dbe33d](1dbe33d))
* Client hangs when implementing `AgentExecutor` and `await`ing twice in
execute method
([#379](#379))
([c147a83](c147a83))
* **grpc:** Update `CreateTaskPushNotificationConfig` endpoint to
`/v1/{parent=tasks/*/pushNotificationConfigs}`
([#415](#415))
([73dddc3](73dddc3))
* make `event_consumer` tolerant to closed queues on py3.13
([#407](#407))
([a371461](a371461))
* non-blocking `send_message` server handler not invoke push
notification
([#394](#394))
([db82a65](db82a65))
* **proto:** Add `icon_url` to `a2a.proto`
([#416](#416))
([00703e3](00703e3))
* **spec:** Suggest Unique Identifier fields to be UUID
([#405](#405))
([da14cea](da14cea))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants