diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index d87ab2c05..7594f95b2 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2914,7 +2914,7 @@ rd_kafka_cgrp_consumer_assignment_with_metadata( rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *assignment, rd_list_t **missing_topic_ids) { - int i; + int i, j; rd_kafka_t *rk = rkcg->rkcg_rk; rd_kafka_topic_partition_list_t *assignment_with_metadata = rd_kafka_topic_partition_list_new(assignment->cnt); @@ -2925,13 +2925,25 @@ rd_kafka_cgrp_consumer_assignment_with_metadata( rd_kafka_Uuid_t request_topic_id = rd_kafka_topic_partition_get_topic_id( &assignment->elems[i]); + int partition = assignment->elems[i].partition; + rd_bool_t partition_found = rd_false; rd_kafka_rdlock(rk); rkmce = rd_kafka_metadata_cache_find_by_id(rk, request_topic_id, 1); - if (rkmce) + if (rkmce) { topic_name = rd_strdup(rkmce->rkmce_mtopic.topic); + // Check if partition exists in metadata + int pcnt = rkmce->rkmce_mtopic.partition_cnt; + for (j = 0; j < pcnt; j++) { + if (rkmce->rkmce_mtopic.partitions[j].id == + partition) { + partition_found = rd_true; + break; + } + } + } rd_kafka_rdunlock(rk); if (unlikely(!topic_name)) { @@ -2941,7 +2953,8 @@ rd_kafka_cgrp_consumer_assignment_with_metadata( topic_name = rd_strdup(rktpar->topic); } - if (likely(topic_name != NULL)) { + // If topic name is found and partition exists in metadata + if (likely(topic_name != NULL) && partition_found) { rd_kafka_topic_partition_list_add_with_topic_name_and_id( assignment_with_metadata, request_topic_id, topic_name, assignment->elems[i].partition); @@ -2949,6 +2962,16 @@ rd_kafka_cgrp_consumer_assignment_with_metadata( continue; } + if (!partition_found) + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Partition assigned to this consumer is not " + "present in cached metadata for topic id: %s. " + "This may indicate that the topic's partition " + "count has increased and metadata needs to be " + "refreshed. ", + rd_kafka_Uuid_base64str(&request_topic_id)); + if (missing_topic_ids) { if (unlikely(!*missing_topic_ids)) *missing_topic_ids = diff --git a/tests/0147-consumer_group_consumer_mock.c b/tests/0147-consumer_group_consumer_mock.c index bd1d3711e..b4e25e798 100644 --- a/tests/0147-consumer_group_consumer_mock.c +++ b/tests/0147-consumer_group_consumer_mock.c @@ -27,10 +27,13 @@ */ #include "test.h" +#include "rdkafka.h" #include "../src/rdkafka_proto.h" #include +#include +#include /** @@ -928,6 +931,123 @@ static void do_test_quick_unsubscribe_tests(void) { } } +/** + * Integration test for KIP-848 partition metadata refresh: + * - Create topic with 2 partitions + * - Start consumer group and verify initial assignment + * - Increase partition count to 4 + * - Reset log tracking variables after partition creation + * - Wait for HeartbeatRequest, HeartbeatResponse, and metadata refresh logs + * - Assert that metadata refresh is triggered for new partitions + */ + +// Globals to track log sequence +static volatile int seen_heartbeat_req = 0; +static volatile int seen_heartbeat_resp = 0; +static volatile int seen_metadata_log = 0; + +static void reset_log_tracking(void) { + seen_heartbeat_req = 0; + seen_heartbeat_resp = 0; + seen_metadata_log = 0; +} + +static void wait_for_metadata_refresh_log(int timeout_ms) { + int elapsed = 0; + while (elapsed < timeout_ms && !seen_metadata_log) { + rd_usleep(500 * 1000, NULL); // 500 ms + elapsed += 500; + } + TEST_ASSERT( + seen_heartbeat_req, + "Expected HeartbeatRequest log not seen after partition creation"); + TEST_ASSERT( + seen_heartbeat_resp, + "Expected HeartbeatResponse log not seen after partition creation"); + TEST_ASSERT(seen_metadata_log, + "Expected metadata refresh log not seen after partition " + "creation and heartbeat"); +} + +// Custom log callback to capture and process librdkafka logs +static void test_metadata_log_cb(const rd_kafka_t *rk, + int level, + const char *fac, + const char *buf) { + if (strstr(buf, "Sent ConsumerGroupHeartbeatRequest")) { + seen_heartbeat_req = 1; + } + if (seen_heartbeat_req && + strstr(buf, "Received ConsumerGroupHeartbeatResponse")) { + seen_heartbeat_resp = 1; + } + if (seen_heartbeat_resp && + strstr(buf, + "Partition assigned to this consumer is not present in " + "cached metadata")) { + seen_metadata_log = 1; + } +} + +static rd_kafka_t *create_consumers(const char *group) { + rd_kafka_conf_t *conf; + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "group.id", group); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "debug", "cgrp, protocol"); + rd_kafka_conf_set_log_cb(conf, test_metadata_log_cb); + rd_kafka_t *consumer = test_create_consumer(group, NULL, conf, NULL); + return consumer; +} + +static void do_test_setup_and_run_metadata_refresh_test(void) { + const char *topic = test_mk_topic_name("cgrp_metadata", 1); + int initial_partitions = 2; + int new_partitions = 4; + rd_kafka_t *c1, *c2, *rk; + const char *group = "grp_metadata"; + + SUB_TEST_QUICK(); + + TEST_SAY("Creating topic %s with %d partitions\n", topic, + initial_partitions); + test_create_topic(NULL, topic, initial_partitions, 1); + + TEST_SAY("Creating consumers\n"); + c1 = create_consumers(group); + c2 = create_consumers(group); + + rk = test_create_handle(RD_KAFKA_PRODUCER, NULL); + + TEST_SAY("Subscribing to topic %s\n", topic); + test_consumer_subscribe(c1, topic); + test_consumer_subscribe(c2, topic); + + // Wait for initial assignment + test_consumer_wait_assignment(c1, rd_false); + test_consumer_wait_assignment(c2, rd_false); + + // Create new partitions + TEST_SAY("Increasing partition count to %d\n", new_partitions); + test_create_partitions(rk, topic, new_partitions); + + // Reset log tracking variables to only consider logs after partition + // creation + reset_log_tracking(); + + // Wait for expected logs for up to 10 seconds + wait_for_metadata_refresh_log(10000); + + TEST_SAY("Closing consumers\n"); + test_consumer_close(c1); + test_consumer_close(c2); + rd_kafka_destroy(c1); + rd_kafka_destroy(c2); + rd_kafka_destroy(rk); + + SUB_TEST_PASS(); +} + int main_0147_consumer_group_consumer_mock(int argc, char **argv) { TEST_SKIP_MOCK_CLUSTER(0); @@ -948,5 +1068,7 @@ int main_0147_consumer_group_consumer_mock(int argc, char **argv) { do_test_quick_unsubscribe_tests(); + do_test_setup_and_run_metadata_refresh_test(); + return 0; }