Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ private void testHeaders(Map<String, Object> consumerConfig) throws Exception {
) {
var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes());
record.headers().add("headerKey", "headerValue".getBytes());
record.headers().add("headerKey2", "headerValue2".getBytes());
record.headers().add("headerKey3", "headerValue3".getBytes());
producer.send(record);
producer.flush();

assertEquals(0, consumer.assignment().size());
consumer.assign(List.of(TP));
Expand All @@ -212,8 +215,15 @@ var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getByt
consumer.seek(TP, 0);
var records = consumeRecords(consumer, numRecords);
assertEquals(numRecords, records.size());

var header = records.get(0).headers().lastHeader("headerKey");
assertEquals("headerValue", header == null ? null : new String(header.value()));

// Test the order of headers in a record is preserved when producing and consuming
Header[] headers = records.get(0).headers().toArray();
assertEquals("headerKey", headers[0].key());
assertEquals("headerKey2", headers[1].key());
assertEquals("headerKey3", headers[2].key());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ public void testHeaders() {
int numRecords = 1;
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
record.headers().add("headerKey", "headerValue".getBytes());
record.headers().add("headerKey2", "headerValue2".getBytes());
record.headers().add("headerKey3", "headerValue3".getBytes());
producer.send(record);
producer.flush();

Expand All @@ -475,11 +477,15 @@ public void testHeaders() {
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
assertEquals(numRecords, records.size());

for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
Header header = consumerRecord.headers().lastHeader("headerKey");
if (header != null)
assertEquals("headerValue", new String(header.value()));
}
Header header = records.get(0).headers().lastHeader("headerKey");
assertEquals("headerValue", new String(header.value()));

// Test the order of headers in a record is preserved when producing and consuming
Header[] headers = records.get(0).headers().toArray();
assertEquals("headerKey", headers[0].key());
assertEquals("headerKey2", headers[1].key());
assertEquals("headerKey3", headers[2].key());

verifyShareGroupStateTopicRecordsProduced();
}
}
Expand Down
17 changes: 15 additions & 2 deletions clients/src/main/java/org/apache/kafka/common/header/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@
*/
package org.apache.kafka.common.header;

/**
* A header is a key-value pair.
*/
public interface Header {


/**
* Returns the key of the header.
*
* @return the header's key; must not be null.
*/
String key();

/**
* Returns the value of the header.
*
* @return the header's value; may be null.
*/
byte[] value();

}
18 changes: 12 additions & 6 deletions clients/src/main/java/org/apache/kafka/common/header/Headers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
*/
package org.apache.kafka.common.header;


/**
* A mutable ordered collection of {@link Header} objects. Note that multiple headers may have the same {@link Header#key() key}.
* <p>
* The order of headers is preserved in the order they were added.
*/
public interface Headers extends Iterable<Header> {

/**
* Adds a header (key inside), to the end, returning if the operation succeeded.
*
* @param header the Header to be added
* @param header the Header to be added.
* @return this instance of the Headers, once the header is added.
* @throws IllegalStateException is thrown if headers are in a read-only state.
*/
Expand All @@ -30,8 +36,8 @@ public interface Headers extends Iterable<Header> {
/**
* Creates and adds a header, to the end, returning if the operation succeeded.
*
* @param key of the header to be added.
* @param value of the header to be added.
* @param key of the header to be added; must not be null.
* @param value of the header to be added; may be null.
* @return this instance of the Headers, once the header is added.
* @throws IllegalStateException is thrown if headers are in a read-only state.
*/
Expand All @@ -40,7 +46,7 @@ public interface Headers extends Iterable<Header> {
/**
* Removes all headers for the given key returning if the operation succeeded.
*
* @param key to remove all headers for.
* @param key to remove all headers for; must not be null.
* @return this instance of the Headers, once the header is removed.
* @throws IllegalStateException is thrown if headers are in a read-only state.
*/
Expand All @@ -49,15 +55,15 @@ public interface Headers extends Iterable<Header> {
/**
* Returns just one (the very last) header for the given key, if present.
*
* @param key to get the last header for.
* @param key to get the last header for; must not be null.
* @return this last header matching the given key, returns null if not present.
*/
Header lastHeader(String key);

/**
* Returns all headers for the given key, in the order they were added in, if present.
*
* @param key to return the headers for.
* @param key to return the headers for; must not be null.
* @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned.
*/
Iterable<Header> headers(String key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add documents and ut for the Iterable#remove?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ public void testAdd() {
assertEquals(2, getCount(headers));
}

@Test
public void testAddHeadersPreserveOder() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Oder -> Order

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Headers headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
headers.add(new RecordHeader("key2", "value2".getBytes()));
headers.add(new RecordHeader("key3", "value3".getBytes()));

Header[] headersArr = headers.toArray();
assertHeader("key", "value", headersArr[0]);
assertHeader("key2", "value2", headersArr[1]);
assertHeader("key3", "value3", headersArr[2]);

assertEquals(3, getCount(headers));
}

@Test
public void testRemove() {
Headers headers = new RecordHeaders();
Expand Down
1 change: 1 addition & 0 deletions docs/implementation.html
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ <h5 class="anchor-heading"><a id="recordheader" class="anchor-link"></a><a href=
headerKey: String
headerValueLength: varint
Value: byte[]</code></pre>
<p>The key of a record header is guaranteed to be non-null, while the value of a record header may be null. The order of headers in a record is preserved when producing and consuming.</p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add comments to the public API Header?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

<p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
is also encoded as a varint.</p>

Expand Down
Loading