Skip to content

Commit 90dcd3a

Browse files
Merge pull request #118 from codecrafters-io/refactor-record-batches
Refactor record batch and children to use kafka values
2 parents 80e0d2a + 193203d commit 90dcd3a

File tree

5 files changed

+119
-94
lines changed

5 files changed

+119
-94
lines changed

protocol/kafka_files_generator/cluster_metadata_generator.go

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/codecrafters-io/kafka-tester/protocol/encoder"
99
"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi"
10+
"github.com/codecrafters-io/kafka-tester/protocol/value"
1011
)
1112

1213
type ClusterMetadataGenerator struct {
@@ -50,21 +51,21 @@ func (g *ClusterMetadataGenerator) writeLogFile() error {
5051
}
5152

5253
recordBatch1 := kafkaapi.RecordBatch{
53-
BaseOffset: baseOffset,
54-
PartitionLeaderEpoch: 1,
55-
Attributes: 0,
56-
LastOffsetDelta: 0, // len(records) - 1
57-
FirstTimestamp: 1726045943832,
58-
MaxTimestamp: 1726045943832,
59-
ProducerId: -1,
60-
ProducerEpoch: -1,
61-
BaseSequence: -1,
54+
BaseOffset: value.Int64{Value: baseOffset},
55+
PartitionLeaderEpoch: value.Int32{Value: 1},
56+
Attributes: value.Int16{Value: 0},
57+
LastOffsetDelta: value.Int32{Value: 0}, // len(records) - 1
58+
FirstTimestamp: value.Int64{Value: 1726045943832},
59+
MaxTimestamp: value.Int64{Value: 1726045943832},
60+
ProducerId: value.Int64{Value: -1},
61+
ProducerEpoch: value.Int16{Value: -1},
62+
BaseSequence: value.Int32{Value: -1},
6263
Records: []kafkaapi.Record{
6364
{
64-
Attributes: 0,
65-
TimestampDelta: 0,
66-
Key: nil,
67-
Value: GetEncodedBytes(featureLevelRecord),
65+
Attributes: value.Int8{Value: 0},
66+
TimestampDelta: value.Int64{Value: 0},
67+
Key: value.RawBytes{Value: nil},
68+
Value: value.RawBytes{Value: GetEncodedBytes(featureLevelRecord)},
6869
Headers: []kafkaapi.RecordHeader{},
6970
},
7071
},
@@ -113,34 +114,34 @@ func (g *ClusterMetadataGenerator) writeLogFile() error {
113114

114115
// Add topic record
115116
records = append(records, kafkaapi.Record{
116-
Attributes: 0,
117-
TimestampDelta: 0,
118-
Key: nil,
119-
Value: GetEncodedBytes(topicRecord),
117+
Attributes: value.Int8{Value: 0},
118+
TimestampDelta: value.Int64{Value: 0},
119+
Key: value.RawBytes{Value: nil},
120+
Value: value.RawBytes{Value: GetEncodedBytes(topicRecord)},
120121
Headers: []kafkaapi.RecordHeader{},
121122
})
122123

123124
// Add all partition records
124125
for _, partitionRecord := range partitionRecords {
125126
records = append(records, kafkaapi.Record{
126-
Attributes: 0,
127-
TimestampDelta: 0,
128-
Key: nil,
129-
Value: GetEncodedBytes(partitionRecord),
127+
Attributes: value.Int8{Value: 0},
128+
TimestampDelta: value.Int64{Value: 0},
129+
Key: value.RawBytes{Value: nil},
130+
Value: value.RawBytes{Value: GetEncodedBytes(partitionRecord)},
130131
Headers: []kafkaapi.RecordHeader{},
131132
})
132133
}
133134

134135
recordBatch := kafkaapi.RecordBatch{
135-
BaseOffset: baseOffset,
136-
PartitionLeaderEpoch: 1,
137-
Attributes: 0,
138-
LastOffsetDelta: int32(len(records) - 1),
139-
FirstTimestamp: 1726045957397,
140-
MaxTimestamp: 1726045957397,
141-
ProducerId: -1,
142-
ProducerEpoch: -1,
143-
BaseSequence: -1,
136+
BaseOffset: value.Int64{Value: baseOffset},
137+
PartitionLeaderEpoch: value.Int32{Value: 1},
138+
Attributes: value.Int16{Value: 0},
139+
LastOffsetDelta: value.Int32{Value: int32(len(records) - 1)},
140+
FirstTimestamp: value.Int64{Value: 1726045957397},
141+
MaxTimestamp: value.Int64{Value: 1726045957397},
142+
ProducerId: value.Int64{Value: -1},
143+
ProducerEpoch: value.Int16{Value: -1},
144+
BaseSequence: value.Int32{Value: -1},
144145
Records: records,
145146
}
146147
recordBatches = append(recordBatches, recordBatch)

protocol/kafka_files_generator/partition_generator.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/codecrafters-io/kafka-tester/protocol/encoder"
99
"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi"
10+
"github.com/codecrafters-io/kafka-tester/protocol/value"
1011
"github.com/codecrafters-io/tester-utils/logger"
1112
)
1213

@@ -95,21 +96,21 @@ func (c *PartitionGenerationConfig) generateRecordBatchesFromLogs(logs []string)
9596

9697
for i, message := range logs {
9798
recordBatches = append(recordBatches, kafkaapi.RecordBatch{
98-
BaseOffset: int64(i),
99-
PartitionLeaderEpoch: 0,
100-
Attributes: 0,
101-
LastOffsetDelta: 0,
102-
FirstTimestamp: 1726045973899,
103-
MaxTimestamp: 1726045973899,
104-
ProducerId: 0,
105-
ProducerEpoch: 0,
106-
BaseSequence: 0,
99+
BaseOffset: value.Int64{Value: int64(i)},
100+
PartitionLeaderEpoch: value.Int32{Value: 0},
101+
Attributes: value.Int16{Value: 0},
102+
LastOffsetDelta: value.Int32{Value: 0},
103+
FirstTimestamp: value.Int64{Value: 1726045973899},
104+
MaxTimestamp: value.Int64{Value: 1726045973899},
105+
ProducerId: value.Int64{Value: 0},
106+
ProducerEpoch: value.Int16{Value: 0},
107+
BaseSequence: value.Int32{Value: 0},
107108
Records: []kafkaapi.Record{
108109
{
109-
Attributes: 0,
110-
TimestampDelta: 0,
111-
Key: nil,
112-
Value: []byte(message),
110+
Attributes: value.Int8{Value: 0},
111+
TimestampDelta: value.Int64{Value: 0},
112+
Key: value.RawBytes{},
113+
Value: value.RawBytes{Value: []byte(message)},
113114
Headers: []kafkaapi.RecordHeader{},
114115
},
115116
},

protocol/kafkaapi/record.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,38 @@
11
package kafkaapi
22

3-
import "github.com/codecrafters-io/kafka-tester/protocol/encoder"
3+
import (
4+
"github.com/codecrafters-io/kafka-tester/protocol/encoder"
5+
"github.com/codecrafters-io/kafka-tester/protocol/value"
6+
)
47

58
type Record struct {
6-
Length int32
7-
Attributes int8
8-
TimestampDelta int64
9-
OffsetDelta int32
10-
Key []byte
11-
Value []byte
9+
Length value.Int32
10+
Attributes value.Int8
11+
TimestampDelta value.Int64
12+
OffsetDelta value.Int32
13+
Key value.RawBytes
14+
Value value.RawBytes
1215
Headers []RecordHeader
1316
}
1417

1518
func (r Record) Encode(pe *encoder.Encoder) {
1619
propertiesEncoder := encoder.NewEncoder()
1720

18-
propertiesEncoder.WriteInt8(r.Attributes)
19-
propertiesEncoder.WriteVarint(r.TimestampDelta)
20-
propertiesEncoder.WriteVarint(int64(r.OffsetDelta))
21+
propertiesEncoder.WriteInt8(r.Attributes.Value)
22+
propertiesEncoder.WriteVarint(r.TimestampDelta.Value)
23+
propertiesEncoder.WriteVarint(int64(r.OffsetDelta.Value))
2124

2225
// Special encoding that does not belong to any data type and is only present inside Records
2326
// similar to protobuf encoding. It is mentioned in the Kafka docs here: https://kafka.apache.org/documentation/#recordheader
24-
if r.Key == nil {
27+
if r.Key.Value == nil {
2528
propertiesEncoder.WriteVarint(-1)
2629
} else {
27-
propertiesEncoder.WriteVarint(int64(len(r.Key)))
28-
propertiesEncoder.WriteRawBytes(r.Key)
30+
propertiesEncoder.WriteVarint(int64(len(r.Key.Value)))
31+
propertiesEncoder.WriteRawBytes(r.Key.Value)
2932
}
3033

31-
propertiesEncoder.WriteVarint(int64(len(r.Value)))
32-
propertiesEncoder.WriteRawBytes(r.Value)
34+
propertiesEncoder.WriteVarint(int64(len(r.Value.Value)))
35+
propertiesEncoder.WriteRawBytes(r.Value.Value)
3336
propertiesEncoder.WriteVarint(int64(len(r.Headers)))
3437
for _, header := range r.Headers {
3538
header.Encode(propertiesEncoder)
@@ -42,13 +45,13 @@ func (r Record) Encode(pe *encoder.Encoder) {
4245
}
4346

4447
type RecordHeader struct {
45-
Key string
46-
Value []byte
48+
Key value.RawBytes
49+
Value value.RawBytes
4750
}
4851

4952
func (rh RecordHeader) Encode(pe *encoder.Encoder) {
50-
pe.WriteVarint(int64(len(rh.Key)))
51-
pe.WriteRawBytes([]byte(rh.Key))
52-
pe.WriteVarint(int64(len(rh.Value)))
53-
pe.WriteRawBytes(rh.Value)
53+
pe.WriteVarint(int64(len(rh.Key.Value)))
54+
pe.WriteRawBytes(rh.Key.Value)
55+
pe.WriteVarint(int64(len(rh.Value.Value)))
56+
pe.WriteRawBytes(rh.Value.Value)
5457
}

protocol/kafkaapi/record_batch.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,59 +4,64 @@ import (
44
"hash/crc32"
55

66
"github.com/codecrafters-io/kafka-tester/protocol/encoder"
7+
"github.com/codecrafters-io/kafka-tester/protocol/value"
78
)
89

910
type RecordBatches []RecordBatch
1011

1112
func (rbs RecordBatches) Encode(pe *encoder.Encoder) {
12-
for _, rb := range rbs {
13-
rb.Encode(pe)
13+
for i := range rbs {
14+
rbs[i].Encode(pe)
1415
}
1516
}
1617

1718
type RecordBatch struct {
18-
BaseOffset int64
19-
BatchLength int32
20-
PartitionLeaderEpoch int32
21-
Magic int8
22-
CRC int32
23-
Attributes int16
24-
LastOffsetDelta int32
25-
FirstTimestamp int64
26-
MaxTimestamp int64
27-
ProducerId int64
28-
ProducerEpoch int16
29-
BaseSequence int32
19+
BaseOffset value.Int64
20+
BatchLength value.Int32
21+
PartitionLeaderEpoch value.Int32
22+
Magic value.Int8
23+
CRC value.Int32
24+
Attributes value.Int16
25+
LastOffsetDelta value.Int32
26+
FirstTimestamp value.Int64
27+
MaxTimestamp value.Int64
28+
ProducerId value.Int64
29+
ProducerEpoch value.Int16
30+
BaseSequence value.Int32
3031
Records []Record
3132
}
3233

33-
func (rb RecordBatch) Encode(pe *encoder.Encoder) {
34+
func (rb *RecordBatch) Encode(pe *encoder.Encoder) {
3435
propertiesEncoder := encoder.NewEncoder()
35-
propertiesEncoder.WriteInt16(rb.Attributes)
36-
propertiesEncoder.WriteInt32(rb.LastOffsetDelta)
37-
propertiesEncoder.WriteInt64(rb.FirstTimestamp)
38-
propertiesEncoder.WriteInt64(rb.MaxTimestamp)
39-
propertiesEncoder.WriteInt64(rb.ProducerId)
40-
propertiesEncoder.WriteInt16(rb.ProducerEpoch)
41-
propertiesEncoder.WriteInt32(rb.BaseSequence)
36+
propertiesEncoder.WriteInt16(rb.Attributes.Value)
37+
propertiesEncoder.WriteInt32(rb.LastOffsetDelta.Value)
38+
propertiesEncoder.WriteInt64(rb.FirstTimestamp.Value)
39+
propertiesEncoder.WriteInt64(rb.MaxTimestamp.Value)
40+
propertiesEncoder.WriteInt64(rb.ProducerId.Value)
41+
propertiesEncoder.WriteInt16(rb.ProducerEpoch.Value)
42+
propertiesEncoder.WriteInt32(rb.BaseSequence.Value)
4243
propertiesEncoder.WriteInt32(int32(len(rb.Records)))
4344

4445
for i, record := range rb.Records {
45-
record.OffsetDelta = int32(i) // Offset Deltas are consecutive numerals from 0 to N-1
46+
record.OffsetDelta = value.Int32{Value: int32(i)} // Offset Deltas are consecutive numerals from 0 to N-1
4647
// We can set them programmatically as we know the order of the records
4748
record.Encode(propertiesEncoder)
4849
}
4950

5051
propertiesEncoderBytes := propertiesEncoder.Bytes()
5152
computedChecksum := crc32.Checksum(propertiesEncoderBytes, crc32.MakeTable(crc32.Castagnoli))
52-
rb.CRC = int32(computedChecksum)
53-
rb.BatchLength = int32(len(propertiesEncoderBytes) + 4 + 1 + 4) // partitionLeaderEpoch + magic value + CRC
53+
rb.CRC = value.Int32{
54+
Value: int32(computedChecksum),
55+
}
56+
rb.BatchLength = value.Int32{
57+
Value: int32(len(propertiesEncoderBytes) + 4 + 1 + 4), // partitionLeaderEpoch + magic value + CRC
58+
}
5459

5560
// Encode everything now
56-
pe.WriteInt64(rb.BaseOffset)
57-
pe.WriteInt32(rb.BatchLength)
58-
pe.WriteInt32(rb.PartitionLeaderEpoch)
59-
pe.WriteInt8(2) // Magic value is 2
60-
pe.WriteInt32(rb.CRC) // CRC placeholder
61+
pe.WriteInt64(rb.BaseOffset.Value)
62+
pe.WriteInt32(rb.BatchLength.Value)
63+
pe.WriteInt32(rb.PartitionLeaderEpoch.Value)
64+
pe.WriteInt8(2) // Magic value is 2
65+
pe.WriteInt32(rb.CRC.Value) // CRC placeholder
6166
pe.WriteRawBytes(propertiesEncoderBytes)
6267
}

protocol/value/int64.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package value
2+
3+
import "fmt"
4+
5+
type Int64 struct {
6+
Value int64
7+
}
8+
9+
func (v Int64) String() string {
10+
return fmt.Sprintf("%d", v.Value)
11+
}
12+
13+
func (v Int64) GetType() string {
14+
return "INT64"
15+
}

0 commit comments

Comments
 (0)