Skip to content

Commit b148348

Browse files
committed
Reduced memory overhead of preparing LZ4-compressed data for server.
Do not compress a whole serialized block, but instead only a reasonable-sized chunk. This removes some temporary buffers and reduces memory pressure. Also minor refactoring: - moved all serialization-format code to WireFormat class. - removed CodedOutputStream and CodedInputStream classes.
1 parent 5cfda7f commit b148348

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+566
-462
lines changed

clickhouse/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ SET ( clickhouse-cpp-lib-src
55
base/output.cpp
66
base/platform.cpp
77
base/socket.cpp
8+
base/wire_format.cpp
89

910
columns/array.cpp
1011
columns/date.cpp

clickhouse/base/coded.cpp

Lines changed: 16 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -4,97 +4,27 @@
44

55
namespace clickhouse {
66

7-
static const int MAX_VARINT_BYTES = 10;
7+
//static const int MAX_VARINT_BYTES = 10;
88

9-
CodedInputStream::CodedInputStream(ZeroCopyInput* input)
10-
: input_(input)
11-
{
12-
}
13-
14-
bool CodedInputStream::ReadRaw(void* buffer, size_t size) {
15-
uint8_t* p = static_cast<uint8_t*>(buffer);
16-
17-
while (size > 0) {
18-
const void* ptr;
19-
size_t len = input_->Next(&ptr, size);
20-
21-
memcpy(p, ptr, len);
22-
23-
p += len;
24-
size -= len;
25-
}
26-
27-
return true;
28-
}
29-
30-
bool CodedInputStream::Skip(size_t count) {
31-
while (count > 0) {
32-
const void* ptr;
33-
size_t len = input_->Next(&ptr, count);
34-
35-
if (len == 0) {
36-
return false;
37-
}
38-
39-
count -= len;
40-
}
41-
42-
return true;
43-
}
44-
45-
bool CodedInputStream::ReadVarint64(uint64_t* value) {
46-
*value = 0;
9+
//CodedInputStream::CodedInputStream(ZeroCopyInput* input)
10+
// : input_(input)
11+
//{
12+
//}
4713

48-
for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
49-
uint8_t byte;
14+
//bool CodedInputStream::ReadRaw(void* buffer, size_t size) {
15+
// uint8_t* p = static_cast<uint8_t*>(buffer);
5016

51-
if (!input_->ReadByte(&byte)) {
52-
return false;
53-
} else {
54-
*value |= uint64_t(byte & 0x7F) << (7 * i);
17+
// while (size > 0) {
18+
// const void* ptr;
19+
// size_t len = input_->Next(&ptr, size);
5520

56-
if (!(byte & 0x80)) {
57-
return true;
58-
}
59-
}
60-
}
21+
// memcpy(p, ptr, len);
6122

62-
// TODO skip invalid
63-
return false;
64-
}
65-
66-
67-
CodedOutputStream::CodedOutputStream(ZeroCopyOutput* output)
68-
: output_(output)
69-
{
70-
}
23+
// p += len;
24+
// size -= len;
25+
// }
7126

72-
void CodedOutputStream::Flush() {
73-
output_->Flush();
74-
}
75-
76-
void CodedOutputStream::WriteRaw(const void* buffer, int size) {
77-
output_->Write(buffer, size);
78-
}
79-
80-
void CodedOutputStream::WriteVarint64(uint64_t value) {
81-
uint8_t bytes[MAX_VARINT_BYTES];
82-
int size = 0;
83-
84-
for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
85-
uint8_t byte = value & 0x7F;
86-
if (value > 0x7F)
87-
byte |= 0x80;
88-
89-
bytes[size++] = byte;
90-
91-
value >>= 7;
92-
if (!value) {
93-
break;
94-
}
95-
}
96-
97-
WriteRaw(bytes, size);
98-
}
27+
// return true;
28+
//}
9929

10030
}

clickhouse/base/coded.h

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -7,59 +7,4 @@
77

88
namespace clickhouse {
99

10-
/**
11-
* Class which reads and decodes binary data which is composed of varint-
12-
* encoded integers and fixed-width pieces.
13-
*/
14-
class CodedInputStream {
15-
public:
16-
/// Create a CodedInputStream that reads from the given ZeroCopyInput.
17-
explicit CodedInputStream(ZeroCopyInput* input);
18-
19-
// Read an unsigned integer with Varint encoding, truncating to 32 bits.
20-
// Reading a 32-bit value is equivalent to reading a 64-bit one and casting
21-
// it to uint32, but may be more efficient.
22-
bool ReadVarint32(uint32_t* value);
23-
24-
// Read an unsigned integer with Varint encoding.
25-
bool ReadVarint64(uint64_t* value);
26-
27-
// Read raw bytes, copying them into the given buffer.
28-
bool ReadRaw(void* buffer, size_t size);
29-
30-
// Like ReadRaw, but reads into a string.
31-
//
32-
// Implementation Note: ReadString() grows the string gradually as it
33-
// reads in the data, rather than allocating the entire requested size
34-
// upfront. This prevents denial-of-service attacks in which a client
35-
// could claim that a string is going to be MAX_INT bytes long in order to
36-
// crash the server because it can't allocate this much space at once.
37-
bool ReadString(std::string* buffer, int size);
38-
39-
// Skips a number of bytes. Returns false if an underlying read error
40-
// occurs.
41-
bool Skip(size_t count);
42-
43-
private:
44-
ZeroCopyInput* input_;
45-
};
46-
47-
48-
class CodedOutputStream {
49-
public:
50-
/// Create a CodedInputStream that writes to the given ZeroCopyOutput.
51-
explicit CodedOutputStream(ZeroCopyOutput* output);
52-
53-
void Flush();
54-
55-
// Write raw bytes, copying them from the given buffer.
56-
void WriteRaw(const void* buffer, int size);
57-
58-
/// Write an unsigned integer with Varint encoding.
59-
void WriteVarint64(const uint64_t value);
60-
61-
private:
62-
ZeroCopyOutput* output_;
63-
};
64-
6510
}

clickhouse/base/compressed.cpp

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
#include "compressed.h"
22
#include "wire_format.h"
3+
#include "output.h"
34

45
#include <cityhash/city.h>
56
#include <lz4/lz4.h>
67
#include <stdexcept>
78
#include <system_error>
89

10+
#include <iostream>
11+
12+
namespace {
13+
static const size_t HEADER_SIZE = 9;
14+
static const size_t EXTRA_PREALLOCATE_COMPRESS_BUFFER = 15;
15+
static const uint8_t COMPRESSION_METHOD = 0x82;
916
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
17+
}
1018

1119
namespace clickhouse {
1220

13-
CompressedInput::CompressedInput(CodedInputStream* input)
21+
CompressedInput::CompressedInput(InputStream* input)
1422
: input_(input)
1523
{
1624
}
@@ -50,7 +58,7 @@ bool CompressedInput::Decompress() {
5058
return false;
5159
}
5260

53-
if (method != 0x82) {
61+
if (method != COMPRESSION_METHOD) {
5462
throw std::runtime_error("unsupported compression method " +
5563
std::to_string(int(method)));
5664
} else {
@@ -75,7 +83,7 @@ bool CompressedInput::Decompress() {
7583
out.Write(&original, sizeof(original));
7684
}
7785

78-
if (!WireFormat::ReadBytes(input_, tmp.data() + 9, compressed - 9)) {
86+
if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
7987
return false;
8088
} else {
8189
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
@@ -85,7 +93,7 @@ bool CompressedInput::Decompress() {
8593

8694
data_ = Buffer(original);
8795

88-
if (LZ4_decompress_safe((const char*)tmp.data() + 9, (char*)data_.data(), compressed - 9, original) < 0) {
96+
if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), compressed - HEADER_SIZE, original) < 0) {
8997
throw std::runtime_error("can't decompress data");
9098
} else {
9199
mem_.Reset(data_.data(), original);
@@ -95,4 +103,64 @@ bool CompressedInput::Decompress() {
95103
return true;
96104
}
97105

106+
107+
CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
108+
: destination_(destination),
109+
max_compressed_chunk_size_(max_compressed_chunk_size)
110+
{
111+
}
112+
113+
CompressedOutput::~CompressedOutput() {
114+
Flush();
115+
}
116+
117+
size_t CompressedOutput::DoWrite(const void* data, size_t len) {
118+
const size_t original_len = len;
119+
const size_t max_chunk_size = max_compressed_chunk_size_ ? max_compressed_chunk_size_ : len;
120+
121+
while (len > 0)
122+
{
123+
auto to_compress = std::min(len, max_chunk_size);
124+
if (!Compress(data, to_compress))
125+
break;
126+
127+
len -= to_compress;
128+
data = reinterpret_cast<const char*>(data) + to_compress;
129+
}
130+
131+
return original_len - len;
132+
}
133+
134+
void CompressedOutput::DoFlush() {
135+
destination_->Flush();
136+
}
137+
138+
bool CompressedOutput::Compress(const void * data, size_t len) {
139+
140+
const size_t expected_out_size = LZ4_compressBound(len);
141+
compressed_buffer_.resize(std::max(compressed_buffer_.size(), expected_out_size + HEADER_SIZE + EXTRA_PREALLOCATE_COMPRESS_BUFFER));
142+
143+
const int compressed_size = LZ4_compress_default(
144+
(const char*)data,
145+
(char*)compressed_buffer_.data() + HEADER_SIZE,
146+
len,
147+
compressed_buffer_.size() - HEADER_SIZE);
148+
149+
{
150+
auto header = compressed_buffer_.data();
151+
WriteUnaligned(header, COMPRESSION_METHOD);
152+
// Compressed data size with header
153+
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
154+
// Original data size
155+
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
156+
}
157+
158+
WireFormat::WriteFixed(destination_, CityHash128(
159+
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
160+
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
161+
162+
destination_->Flush();
163+
return true;
164+
}
165+
98166
}

clickhouse/base/compressed.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
#pragma once
22

3-
#include "coded.h"
3+
#include "input.h"
4+
#include "output.h"
5+
#include "buffer.h"
46

57
namespace clickhouse {
68

79
class CompressedInput : public ZeroCopyInput {
810
public:
9-
CompressedInput(CodedInputStream* input);
11+
CompressedInput(InputStream* input);
1012
~CompressedInput();
1113

1214
protected:
@@ -15,10 +17,27 @@ class CompressedInput : public ZeroCopyInput {
1517
bool Decompress();
1618

1719
private:
18-
CodedInputStream* const input_;
20+
InputStream* const input_;
1921

2022
Buffer data_;
2123
ArrayInput mem_;
2224
};
2325

26+
class CompressedOutput : public OutputStream {
27+
public:
28+
CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0);
29+
~CompressedOutput();
30+
31+
protected:
32+
size_t DoWrite(const void* data, size_t len) override;
33+
void DoFlush() override;
34+
bool Compress(const void * data, size_t len);
35+
36+
37+
private:
38+
OutputStream * destination_;
39+
Buffer compressed_buffer_;
40+
size_t max_compressed_chunk_size_;
41+
};
42+
2443
}

clickhouse/base/input.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@
55

66
namespace clickhouse {
77

8+
bool ZeroCopyInput::Skip(size_t bytes) {
9+
while (bytes > 0) {
10+
const void* ptr;
11+
size_t len = Next(&ptr, bytes);
12+
13+
if (len == 0) {
14+
return false;
15+
}
16+
17+
bytes -= len;
18+
}
19+
20+
return true;
21+
}
22+
823
size_t ZeroCopyInput::DoRead(void* buf, size_t len) {
924
const void* ptr;
1025
size_t result = DoNext(&ptr, len);

clickhouse/base/input.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ class InputStream {
2121
return DoRead(buf, len);
2222
}
2323

24+
// Skips a number of bytes. Returns false if an underlying read error occurs.
25+
virtual bool Skip(size_t bytes) = 0;
26+
2427
protected:
2528
virtual size_t DoRead(void* buf, size_t len) = 0;
2629
};
@@ -32,6 +35,8 @@ class ZeroCopyInput : public InputStream {
3235
return DoNext(buf, len);
3336
}
3437

38+
bool Skip(size_t bytes) override;
39+
3540
protected:
3641
virtual size_t DoNext(const void** ptr, size_t len) = 0;
3742

0 commit comments

Comments
 (0)