Skip to content

Commit 48a7c7d

Browse files
authored
Fix compression types read issue in GetTelemetrySubscriptions response with big-endian architectures (#5183)
* Fix compression types read issue in GetTelemetrySubscriptions response for big-endian architectures * Decrease allocated buffer size in `rd_kafka_PushTelemetryRequest` and explicitly cast the enum
1 parent fa73c2d commit 48a7c7d

File tree

3 files changed

+32
-8
lines changed

3 files changed

+32
-8
lines changed

CHANGELOG.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,24 @@
1+
# librdkafka v2.12.0
2+
3+
librdkafka v2.12.0 is a feature release:
4+
5+
* Fix compression types read issue in GetTelemetrySubscriptions response
6+
for big-endian architectures (#5183, @paravoid).
7+
8+
9+
## Fixes
10+
11+
### Telemetry fixes
12+
13+
* Issues: #5179 .
14+
Fix issue in GetTelemetrySubscriptions with big-endian
15+
architectures where wrong values are read as
16+
accepted compression types causing the metrics to be sent uncompressed.
17+
Happening since 2.5.0. Since 2.10.1 unit tests are failing when run on
18+
big-endian architectures (#5183, @paravoid).
19+
20+
21+
122
# librdkafka v2.11.1
223

324
librdkafka v2.11.1 is a maintenance release:

src/rdkafka_mock_handlers.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2696,14 +2696,16 @@ static int rd_kafka_mock_handle_PushTelemetry(rd_kafka_mock_connection_t *mconn,
26962696
rd_kafka_Uuid_t ClientInstanceId;
26972697
int32_t SubscriptionId;
26982698
rd_bool_t terminating;
2699+
int8_t CompressionType;
26992700
rd_kafka_compression_t compression_type = RD_KAFKA_COMPRESSION_NONE;
27002701
rd_kafkap_bytes_t metrics;
27012702
rd_kafka_resp_err_t err;
27022703

27032704
rd_kafka_buf_read_uuid(rkbuf, &ClientInstanceId);
27042705
rd_kafka_buf_read_i32(rkbuf, &SubscriptionId);
27052706
rd_kafka_buf_read_bool(rkbuf, &terminating);
2706-
rd_kafka_buf_read_i8(rkbuf, &compression_type);
2707+
rd_kafka_buf_read_i8(rkbuf, &CompressionType);
2708+
compression_type = CompressionType;
27072709
rd_kafka_buf_read_kbytes(rkbuf, &metrics);
27082710

27092711
void *uncompressed_payload = NULL;

src/rdkafka_request.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6517,15 +6517,14 @@ rd_kafka_PushTelemetryRequest(rd_kafka_broker_t *rkb,
65176517
}
65186518

65196519
size_t len = sizeof(rd_kafka_Uuid_t) + sizeof(int32_t) +
6520-
sizeof(rd_bool_t) + sizeof(compression_type) +
6521-
metrics_size;
6520+
sizeof(rd_bool_t) + sizeof(int8_t) + metrics_size;
65226521
rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_PushTelemetry,
65236522
1, len, rd_true);
65246523

65256524
rd_kafka_buf_write_uuid(rkbuf, client_instance_id);
65266525
rd_kafka_buf_write_i32(rkbuf, subscription_id);
65276526
rd_kafka_buf_write_bool(rkbuf, terminating);
6528-
rd_kafka_buf_write_i8(rkbuf, compression_type);
6527+
rd_kafka_buf_write_i8(rkbuf, (int8_t)compression_type);
65296528

65306529
rd_dassert(metrics != NULL);
65316530
rd_dassert(metrics_size >= 0);
@@ -6590,10 +6589,12 @@ void rd_kafka_handle_GetTelemetrySubscriptions(rd_kafka_t *rk,
65906589
rk->rk_telemetry.accepted_compression_types =
65916590
rd_calloc(arraycnt, sizeof(rd_kafka_compression_t));
65926591

6593-
for (i = 0; i < (size_t)arraycnt; i++)
6594-
rd_kafka_buf_read_i8(
6595-
rkbuf,
6596-
&rk->rk_telemetry.accepted_compression_types[i]);
6592+
for (i = 0; i < (size_t)arraycnt; i++) {
6593+
int8_t AcceptedCompressionType;
6594+
rd_kafka_buf_read_i8(rkbuf, &AcceptedCompressionType);
6595+
rk->rk_telemetry.accepted_compression_types[i] =
6596+
AcceptedCompressionType;
6597+
}
65976598
} else {
65986599
rk->rk_telemetry.accepted_compression_types_cnt = 1;
65996600
rk->rk_telemetry.accepted_compression_types =

0 commit comments

Comments
 (0)