@@ -1299,15 +1299,40 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
1299
1299
int i ;
1300
1300
size_t of_Brokers_cnt ;
1301
1301
int32_t response_Brokers_cnt = 0 ;
1302
+ rd_kafka_resp_err_t err ;
1302
1303
1303
1304
if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 3 ) {
1304
1305
/* Response: ThrottleTime */
1305
1306
rd_kafka_buf_write_i32 (resp , 0 );
1306
1307
}
1307
1308
1309
+ err = rd_kafka_mock_next_request_error (mconn , resp );
1310
+ if (err && rkbuf -> rkbuf_reqhdr .ApiVersion < 13 )
1311
+ /* Top-level error code not supported */
1312
+ err = RD_KAFKA_RESP_ERR_NO_ERROR ;
1313
+
1314
+ if (err ) {
1315
+ /* Response: #Brokers */
1316
+ rd_kafka_buf_write_arraycnt (resp , 0 );
1317
+
1318
+ if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 2 ) {
1319
+ /* Response: ClusterId */
1320
+ rd_kafka_buf_write_str (resp , mcluster -> id , -1 );
1321
+ }
1322
+
1323
+ if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 1 ) {
1324
+ /* Response: ControllerId */
1325
+ rd_kafka_buf_write_i32 (resp , mcluster -> controller_id );
1326
+ }
1327
+
1328
+ /* Response: #Topics */
1329
+ rd_kafka_buf_write_arraycnt (resp , 0 );
1330
+
1331
+ goto send_response ;
1332
+ }
1333
+
1308
1334
/* Response: #Brokers */
1309
1335
of_Brokers_cnt = rd_kafka_buf_write_arraycnt_pos (resp );
1310
-
1311
1336
TAILQ_FOREACH (mrkb , & mcluster -> brokers , link ) {
1312
1337
if (!mrkb -> up )
1313
1338
continue ;
@@ -1318,7 +1343,7 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
1318
1343
/* Response: Brokers.Port */
1319
1344
rd_kafka_buf_write_i32 (resp , (int32_t )mrkb -> port );
1320
1345
if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 1 ) {
1321
- /* Response: Brokers.Rack (Matt's going to love this) */
1346
+ /* Response: Brokers.Rack */
1322
1347
rd_kafka_buf_write_str (resp , mrkb -> rack , -1 );
1323
1348
}
1324
1349
rd_kafka_buf_write_tags_empty (resp );
@@ -1455,18 +1480,23 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
1455
1480
rd_kafka_buf_write_arraycnt (resp , 0 );
1456
1481
}
1457
1482
1483
+ if (requested_topics )
1484
+ rd_kafka_topic_partition_list_destroy (requested_topics );
1485
+
1486
+ send_response :
1487
+
1458
1488
if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 8 &&
1459
1489
rkbuf -> rkbuf_reqhdr .ApiVersion <= 10 ) {
1460
1490
/* ClusterAuthorizedOperations */
1461
1491
rd_kafka_buf_write_i32 (resp , INT32_MIN );
1462
1492
}
1463
1493
1464
- rd_kafka_buf_skip_tags (rkbuf );
1465
- rd_kafka_buf_write_tags_empty (resp );
1466
-
1467
- if (requested_topics )
1468
- rd_kafka_topic_partition_list_destroy (requested_topics );
1494
+ if (rkbuf -> rkbuf_reqhdr .ApiVersion >= 13 ) {
1495
+ /* Response: ErrorCode */
1496
+ rd_kafka_buf_write_i16 (resp , err );
1497
+ }
1469
1498
1499
+ rd_kafka_buf_write_tags_empty (resp );
1470
1500
rd_kafka_mock_connection_send_response (mconn , resp );
1471
1501
1472
1502
return 0 ;
@@ -1516,7 +1546,8 @@ rd_kafka_mock_handle_FindCoordinator(rd_kafka_mock_connection_t *mconn,
1516
1546
1517
1547
if (!err && RD_KAFKAP_STR_LEN (& Key ) > 0 ) {
1518
1548
mrkb = rd_kafka_mock_cluster_get_coord (mcluster , KeyType , & Key );
1519
- rd_assert (mrkb );
1549
+ if (!mrkb )
1550
+ err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE ;
1520
1551
}
1521
1552
1522
1553
if (!mrkb && !err )
@@ -3003,7 +3034,7 @@ const struct rd_kafka_mock_api_handler
3003
3034
[RD_KAFKAP_OffsetFetch ] = {0 , 6 , 6 , rd_kafka_mock_handle_OffsetFetch },
3004
3035
[RD_KAFKAP_OffsetCommit ] = {0 , 9 , 8 , rd_kafka_mock_handle_OffsetCommit },
3005
3036
[RD_KAFKAP_ApiVersion ] = {0 , 2 , 3 , rd_kafka_mock_handle_ApiVersion },
3006
- [RD_KAFKAP_Metadata ] = {0 , 12 , 9 , rd_kafka_mock_handle_Metadata },
3037
+ [RD_KAFKAP_Metadata ] = {0 , 13 , 9 , rd_kafka_mock_handle_Metadata },
3007
3038
[RD_KAFKAP_FindCoordinator ] = {0 , 3 , 3 ,
3008
3039
rd_kafka_mock_handle_FindCoordinator },
3009
3040
[RD_KAFKAP_InitProducerId ] = {0 , 4 , 2 ,
0 commit comments