Skip to content

Conversation

PratRanj07
Copy link
Contributor

Currently, in rd_kafka_cgrp_consumer_assignment_with_metadata we only check whether the metadata for the topic is present in cache.
This PR changes the logic to also verify that the partitions listed in the assignment match the current consumer assignment.
Added a test case to cover this new partition assignment matching logic.

@PratRanj07 PratRanj07 requested a review from a team as a code owner September 11, 2025 06:46
@Copilot Copilot AI review requested due to automatic review settings September 11, 2025 06:46
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements KIP-848 partition metadata refresh functionality to trigger metadata refresh when partition count increases. The change modifies the consumer group assignment logic to verify that partitions listed in assignments match the current cached metadata, and triggers a refresh when partitions are missing.

  • Updates consumer assignment validation to check for partition existence in cached metadata
  • Adds debug logging when assigned partitions are not found in metadata cache
  • Includes comprehensive test coverage for the new partition assignment matching logic

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/rdkafka_cgrp.c Core implementation: adds partition existence validation and logging for metadata refresh triggers
tests/0154-metadata_refresh.c New test file implementing integration test for KIP-848 functionality
tests/test.c Test registration: adds declaration and entry for the new metadata refresh test
tests/CMakeLists.txt Build system: includes new test file in CMake configuration
win32/tests/tests.vcxproj Build system: includes new test file in Visual Studio project

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

test_conf_set(conf, "group.id", group);
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "debug", "cgrp, protocol");
rd_kafka_conf_set_log_cb(conf, test_metadata_log_cb);
Copy link
Preview

Copilot AI Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log callback parameter is hardcoded to test_metadata_log_cb instead of using the log_cb parameter passed to the function. This makes the log_cb parameter unused and could lead to incorrect behavior.

Suggested change
rd_kafka_conf_set_log_cb(conf, test_metadata_log_cb);
rd_kafka_conf_set_log_cb(conf, log_cb);

Copilot uses AI. Check for mistakes.

}

int main_0154_metadata_refresh(int argc, char **argv) {
if (!test_consumer_group_protocol_classic())
Copy link
Preview

Copilot AI Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is inverted - the test only runs when the classic protocol is NOT being used, but the test appears designed for classic consumer group protocol based on the heartbeat logs it's looking for. This should be if (test_consumer_group_protocol_classic()).

Suggested change
if (!test_consumer_group_protocol_classic())
if (test_consumer_group_protocol_classic())

Copilot uses AI. Check for mistakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant