-
Notifications
You must be signed in to change notification settings - Fork 3.2k
[KIP-848]: Trigger metadata refresh when partition count increases #5190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…next_target_assignment
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements KIP-848 partition metadata refresh functionality to trigger metadata refresh when partition count increases. The change modifies the consumer group assignment logic to verify that partitions listed in assignments match the current cached metadata, and triggers a refresh when partitions are missing.
- Updates consumer assignment validation to check for partition existence in cached metadata
- Adds debug logging when assigned partitions are not found in metadata cache
- Includes comprehensive test coverage for the new partition assignment matching logic
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
src/rdkafka_cgrp.c |
Core implementation: adds partition existence validation and logging for metadata refresh triggers |
tests/0154-metadata_refresh.c |
New test file implementing integration test for KIP-848 functionality |
tests/test.c |
Test registration: adds declaration and entry for the new metadata refresh test |
tests/CMakeLists.txt |
Build system: includes new test file in CMake configuration |
win32/tests/tests.vcxproj |
Build system: includes new test file in Visual Studio project |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
tests/0154-metadata_refresh.c
Outdated
test_conf_set(conf, "group.id", group); | ||
test_conf_set(conf, "auto.offset.reset", "earliest"); | ||
test_conf_set(conf, "debug", "cgrp, protocol"); | ||
rd_kafka_conf_set_log_cb(conf, test_metadata_log_cb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log callback parameter is hardcoded to test_metadata_log_cb
instead of using the log_cb
parameter passed to the function. This makes the log_cb
parameter unused and could lead to incorrect behavior.
rd_kafka_conf_set_log_cb(conf, test_metadata_log_cb); | |
rd_kafka_conf_set_log_cb(conf, log_cb); |
Copilot uses AI. Check for mistakes.
tests/0154-metadata_refresh.c
Outdated
} | ||
|
||
int main_0154_metadata_refresh(int argc, char **argv) { | ||
if (!test_consumer_group_protocol_classic()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is inverted - the test only runs when the classic protocol is NOT being used, but the test appears designed for classic consumer group protocol based on the heartbeat logs it's looking for. This should be if (test_consumer_group_protocol_classic())
.
if (!test_consumer_group_protocol_classic()) | |
if (test_consumer_group_protocol_classic()) |
Copilot uses AI. Check for mistakes.
Currently, in rd_kafka_cgrp_consumer_assignment_with_metadata we only check whether the metadata for the topic is present in cache.
This PR changes the logic to also verify that the partitions listed in the assignment match the current consumer assignment.
Added a test case to cover this new partition assignment matching logic.