@@ -1299,40 +1299,15 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
1299
1299
int i ;
1300
1300
size_t of_Brokers_cnt ;
1301
1301
int32_t response_Brokers_cnt = 0 ;
1302
- rd_kafka_resp_err_t err ;
1303
1302
1304
1303
if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 3 ) {
1305
1304
/* Response: ThrottleTime */
1306
1305
rd_kafka_buf_write_i32 (resp , 0 );
1307
1306
}
1308
1307
1309
- err = rd_kafka_mock_next_request_error (mconn , resp );
1310
- if (err && rkbuf -> rkbuf_reqhdr .ApiVersion < 13 )
1311
- /* Top-level error code not supported */
1312
- err = RD_KAFKA_RESP_ERR_NO_ERROR ;
1313
-
1314
- if (err ) {
1315
- /* Response: #Brokers */
1316
- rd_kafka_buf_write_arraycnt (resp , 0 );
1317
-
1318
- if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 2 ) {
1319
- /* Response: ClusterId */
1320
- rd_kafka_buf_write_str (resp , mcluster -> id , -1 );
1321
- }
1322
-
1323
- if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 1 ) {
1324
- /* Response: ControllerId */
1325
- rd_kafka_buf_write_i32 (resp , mcluster -> controller_id );
1326
- }
1327
-
1328
- /* Response: #Topics */
1329
- rd_kafka_buf_write_arraycnt (resp , 0 );
1330
-
1331
- goto send_response ;
1332
- }
1333
-
1334
1308
/* Response: #Brokers */
1335
1309
of_Brokers_cnt = rd_kafka_buf_write_arraycnt_pos (resp );
1310
+
1336
1311
TAILQ_FOREACH (mrkb , & mcluster -> brokers , link ) {
1337
1312
if (!mrkb -> up )
1338
1313
continue ;
@@ -1343,7 +1318,7 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
1343
1318
/* Response: Brokers.Port */
1344
1319
rd_kafka_buf_write_i32 (resp , (int32_t )mrkb -> port );
1345
1320
if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 1 ) {
1346
- /* Response: Brokers.Rack */
1321
+ /* Response: Brokers.Rack (Matt's going to love this) */
1347
1322
rd_kafka_buf_write_str (resp , mrkb -> rack , -1 );
1348
1323
}
1349
1324
rd_kafka_buf_write_tags_empty (resp );
@@ -1480,23 +1455,18 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
1480
1455
rd_kafka_buf_write_arraycnt (resp , 0 );
1481
1456
}
1482
1457
1483
- if (requested_topics )
1484
- rd_kafka_topic_partition_list_destroy (requested_topics );
1485
-
1486
- send_response :
1487
-
1488
1458
if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 8 &&
1489
1459
rkbuf -> rkbuf_reqhdr .ApiVersion <= 10 ) {
1490
1460
/* ClusterAuthorizedOperations */
1491
1461
rd_kafka_buf_write_i32 (resp , INT32_MIN );
1492
1462
}
1493
1463
1494
- if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 13 ) {
1495
- /* Response: ErrorCode */
1496
- rd_kafka_buf_write_i16 (resp , err );
1497
- }
1498
-
1464
+ rd_kafka_buf_skip_tags (rkbuf );
1499
1465
rd_kafka_buf_write_tags_empty (resp );
1466
+
1467
+ if (requested_topics )
1468
+ rd_kafka_topic_partition_list_destroy (requested_topics );
1469
+
1500
1470
rd_kafka_mock_connection_send_response (mconn , resp );
1501
1471
1502
1472
return 0 ;
@@ -1546,8 +1516,7 @@ rd_kafka_mock_handle_FindCoordinator(rd_kafka_mock_connection_t *mconn,
1546
1516
1547
1517
if (!err && RD_KAFKAP_STR_LEN (& Key ) > 0 ) {
1548
1518
mrkb = rd_kafka_mock_cluster_get_coord (mcluster , KeyType , & Key );
1549
- if (!mrkb )
1550
- err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE ;
1519
+ rd_assert (mrkb );
1551
1520
}
1552
1521
1553
1522
if (!mrkb && !err )
@@ -3034,7 +3003,7 @@ const struct rd_kafka_mock_api_handler
3034
3003
[RD_KAFKAP_OffsetFetch ] = {0 , 6 , 6 , rd_kafka_mock_handle_OffsetFetch },
3035
3004
[RD_KAFKAP_OffsetCommit ] = {0 , 9 , 8 , rd_kafka_mock_handle_OffsetCommit },
3036
3005
[RD_KAFKAP_ApiVersion ] = {0 , 2 , 3 , rd_kafka_mock_handle_ApiVersion },
3037
- [RD_KAFKAP_Metadata ] = {0 , 13 , 9 , rd_kafka_mock_handle_Metadata },
3006
+ [RD_KAFKAP_Metadata ] = {0 , 12 , 9 , rd_kafka_mock_handle_Metadata },
3038
3007
[RD_KAFKAP_FindCoordinator ] = {0 , 3 , 3 ,
3039
3008
rd_kafka_mock_handle_FindCoordinator },
3040
3009
[RD_KAFKAP_InitProducerId ] = {0 , 4 , 2 ,
0 commit comments