Skip to content

Commit 85715df

Browse files
authored
Merge pull request #150 from Enmk/fix_exception_in_destructor
Refactored streams
2 parents 07fbf6d + da72498 commit 85715df

File tree

14 files changed

+163
-234
lines changed

14 files changed

+163
-234
lines changed

clickhouse/base/compressed.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,20 @@ bool CompressedInput::Decompress() {
5151
uint32_t original = 0;
5252
uint8_t method = 0;
5353

54-
if (!WireFormat::ReadFixed(input_, &hash)) {
54+
if (!WireFormat::ReadFixed(*input_, &hash)) {
5555
return false;
5656
}
57-
if (!WireFormat::ReadFixed(input_, &method)) {
57+
if (!WireFormat::ReadFixed(*input_, &method)) {
5858
return false;
5959
}
6060

6161
if (method != COMPRESSION_METHOD) {
6262
throw std::runtime_error("unsupported compression method " + std::to_string(int(method)));
6363
} else {
64-
if (!WireFormat::ReadFixed(input_, &compressed)) {
64+
if (!WireFormat::ReadFixed(*input_, &compressed)) {
6565
return false;
6666
}
67-
if (!WireFormat::ReadFixed(input_, &original)) {
67+
if (!WireFormat::ReadFixed(*input_, &original)) {
6868
return false;
6969
}
7070

@@ -80,9 +80,10 @@ bool CompressedInput::Decompress() {
8080
out.Write(&method, sizeof(method));
8181
out.Write(&compressed, sizeof(compressed));
8282
out.Write(&original, sizeof(original));
83+
out.Flush();
8384
}
8485

85-
if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
86+
if (!WireFormat::ReadBytes(*input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
8687
return false;
8788
} else {
8889
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
@@ -110,9 +111,7 @@ CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compre
110111
PreallocateCompressBuffer(max_compressed_chunk_size);
111112
}
112113

113-
CompressedOutput::~CompressedOutput() {
114-
Flush();
115-
}
114+
CompressedOutput::~CompressedOutput() { }
116115

117116
size_t CompressedOutput::DoWrite(const void* data, size_t len) {
118117
const size_t original_len = len;
@@ -156,9 +155,9 @@ void CompressedOutput::Compress(const void * data, size_t len) {
156155
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
157156
}
158157

159-
WireFormat::WriteFixed(destination_, CityHash128(
158+
WireFormat::WriteFixed(*destination_, CityHash128(
160159
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
161-
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
160+
WireFormat::WriteBytes(*destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
162161

163162
destination_->Flush();
164163
}

clickhouse/base/input.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ size_t ArrayInput::DoNext(const void** ptr, size_t len) {
5656
}
5757

5858

59-
BufferedInput::BufferedInput(InputStream* slave, size_t buflen)
60-
: slave_(slave)
59+
BufferedInput::BufferedInput(std::unique_ptr<InputStream> source, size_t buflen)
60+
: source_(std::move(source))
6161
, array_input_(nullptr, 0)
6262
, buffer_(buflen)
6363
{
@@ -72,7 +72,7 @@ void BufferedInput::Reset() {
7272
size_t BufferedInput::DoNext(const void** ptr, size_t len) {
7373
if (array_input_.Exhausted()) {
7474
array_input_.Reset(
75-
buffer_.data(), slave_->Read(buffer_.data(), buffer_.size())
75+
buffer_.data(), source_->Read(buffer_.data(), buffer_.size())
7676
);
7777
}
7878

@@ -82,11 +82,11 @@ size_t BufferedInput::DoNext(const void** ptr, size_t len) {
8282
size_t BufferedInput::DoRead(void* buf, size_t len) {
8383
if (array_input_.Exhausted()) {
8484
if (len > buffer_.size() / 2) {
85-
return slave_->Read(buf, len);
85+
return source_->Read(buf, len);
8686
}
8787

8888
array_input_.Reset(
89-
buffer_.data(), slave_->Read(buffer_.data(), buffer_.size())
89+
buffer_.data(), source_->Read(buffer_.data(), buffer_.size())
9090
);
9191
}
9292

clickhouse/base/input.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <cstddef>
44
#include <cstdint>
55
#include <vector>
6+
#include <memory>
67

78
namespace clickhouse {
89

@@ -84,7 +85,7 @@ class ArrayInput : public ZeroCopyInput {
8485

8586
class BufferedInput : public ZeroCopyInput {
8687
public:
87-
BufferedInput(InputStream* slave, size_t buflen = 8192);
88+
BufferedInput(std::unique_ptr<InputStream> source, size_t buflen = 8192);
8889
~BufferedInput() override;
8990

9091
void Reset();
@@ -94,7 +95,7 @@ class BufferedInput : public ZeroCopyInput {
9495
size_t DoNext(const void** ptr, size_t len) override;
9596

9697
private:
97-
InputStream* const slave_;
98+
std::unique_ptr<InputStream> const source_;
9899
ArrayInput array_input_;
99100
std::vector<uint8_t> buffer_;
100101
};

clickhouse/base/output.cpp

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,35 +66,23 @@ size_t BufferOutput::DoNext(void** data, size_t len) {
6666
}
6767

6868

69-
BufferedOutput::BufferedOutput(OutputStream* slave, size_t buflen)
70-
: slave_(slave)
69+
BufferedOutput::BufferedOutput(std::unique_ptr<OutputStream> destination, size_t buflen)
70+
: destination_(std::move(destination))
7171
, buffer_(buflen)
7272
, array_output_(buffer_.data(), buflen)
7373
{
7474
}
7575

76-
BufferedOutput::~BufferedOutput() {
77-
try
78-
{
79-
Flush();
80-
}
81-
catch (...)
82-
{
83-
// That means we've failed to flush some data e.g. to the socket,
84-
// but there is nothing we can do at this point (can't bring the socket back),
85-
// and throwing in destructor is really a bad idea.
86-
// The best we can do is to log the error and ignore it, but currently there is no logging subsystem.
87-
}
88-
}
76+
BufferedOutput::~BufferedOutput() { }
8977

9078
void BufferedOutput::Reset() {
9179
array_output_.Reset(buffer_.data(), buffer_.size());
9280
}
9381

9482
void BufferedOutput::DoFlush() {
9583
if (array_output_.Data() != buffer_.data()) {
96-
slave_->Write(buffer_.data(), array_output_.Data() - buffer_.data());
97-
slave_->Flush();
84+
destination_->Write(buffer_.data(), array_output_.Data() - buffer_.data());
85+
destination_->Flush();
9886

9987
array_output_.Reset(buffer_.data(), buffer_.size());
10088
}
@@ -114,7 +102,7 @@ size_t BufferedOutput::DoWrite(const void* data, size_t len) {
114102
Flush();
115103

116104
if (len > buffer_.size() / 2) {
117-
return slave_->Write(data, len);
105+
return destination_->Write(data, len);
118106
}
119107
}
120108

clickhouse/base/output.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <cstdint>
77
#include <vector>
88
#include <memory.h>
9+
#include <memory>
910

1011
namespace clickhouse {
1112

@@ -92,11 +93,13 @@ class ArrayOutput : public ZeroCopyOutput {
9293

9394
/**
9495
* A ZeroCopyOutput stream backed by a vector.
96+
*
97+
* Doesn't Flush() in destructor, client must ensure to do it manually at some point.
9598
*/
9699
class BufferOutput : public ZeroCopyOutput {
97100
public:
98101
BufferOutput(Buffer* buf);
99-
~BufferOutput();
102+
~BufferOutput() override;
100103

101104
protected:
102105
size_t DoNext(void** data, size_t len) override;
@@ -106,10 +109,16 @@ class BufferOutput : public ZeroCopyOutput {
106109
size_t pos_;
107110
};
108111

109-
112+
/** BufferedOutput writes data to internal buffer first.
113+
*
114+
* Any data goes to underlying stream only if internal buffer is full
115+
* or when client invokes Flush() on this.
116+
*
117+
* Doesn't Flush() in destructor, client must ensure to do it manually at some point.
118+
*/
110119
class BufferedOutput : public ZeroCopyOutput {
111120
public:
112-
BufferedOutput(OutputStream* slave, size_t buflen = 8192);
121+
explicit BufferedOutput(std::unique_ptr<OutputStream> destination, size_t buflen = 8192);
113122
~BufferedOutput() override;
114123

115124
void Reset();
@@ -120,7 +129,7 @@ class BufferedOutput : public ZeroCopyOutput {
120129
size_t DoWrite(const void* data, size_t len) override;
121130

122131
private:
123-
OutputStream* const slave_;
132+
std::unique_ptr<OutputStream> const destination_;
124133
Buffer buffer_;
125134
ArrayOutput array_output_;
126135
};

clickhouse/base/streamstack.h

Lines changed: 0 additions & 59 deletions
This file was deleted.

clickhouse/base/wire_format.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ constexpr int MAX_VARINT_BYTES = 10;
1111

1212
namespace clickhouse {
1313

14-
bool WireFormat::ReadAll(InputStream * input, void* buf, size_t len) {
14+
bool WireFormat::ReadAll(InputStream& input, void* buf, size_t len) {
1515
uint8_t* p = static_cast<uint8_t*>(buf);
1616

1717
size_t read_previously = 1; // 1 to execute loop at least once
1818
while (len > 0 && read_previously) {
19-
read_previously = input->Read(p, len);
19+
read_previously = input.Read(p, len);
2020

2121
p += read_previously;
2222
len -= read_previously;
@@ -25,13 +25,13 @@ bool WireFormat::ReadAll(InputStream * input, void* buf, size_t len) {
2525
return !len;
2626
}
2727

28-
void WireFormat::WriteAll(OutputStream* output, const void* buf, size_t len) {
28+
void WireFormat::WriteAll(OutputStream& output, const void* buf, size_t len) {
2929
const size_t original_len = len;
3030
const uint8_t* p = static_cast<const uint8_t*>(buf);
3131

3232
size_t written_previously = 1; // 1 to execute loop at least once
3333
while (len > 0 && written_previously) {
34-
written_previously = output->Write(p, len);
34+
written_previously = output.Write(p, len);
3535

3636
p += written_previously;
3737
len -= written_previously;
@@ -43,13 +43,13 @@ void WireFormat::WriteAll(OutputStream* output, const void* buf, size_t len) {
4343
}
4444
}
4545

46-
bool WireFormat::ReadVarint64(InputStream* input, uint64_t* value) {
46+
bool WireFormat::ReadVarint64(InputStream& input, uint64_t* value) {
4747
*value = 0;
4848

4949
for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
5050
uint8_t byte = 0;
5151

52-
if (!input->ReadByte(&byte)) {
52+
if (!input.ReadByte(&byte)) {
5353
return false;
5454
} else {
5555
*value |= uint64_t(byte & 0x7F) << (7 * i);
@@ -64,7 +64,7 @@ bool WireFormat::ReadVarint64(InputStream* input, uint64_t* value) {
6464
return false;
6565
}
6666

67-
void WireFormat::WriteVarint64(OutputStream* output, uint64_t value) {
67+
void WireFormat::WriteVarint64(OutputStream& output, uint64_t value) {
6868
uint8_t bytes[MAX_VARINT_BYTES];
6969
int size = 0;
7070

@@ -84,14 +84,14 @@ void WireFormat::WriteVarint64(OutputStream* output, uint64_t value) {
8484
WriteAll(output, bytes, size);
8585
}
8686

87-
bool WireFormat::SkipString(InputStream* input) {
87+
bool WireFormat::SkipString(InputStream& input) {
8888
uint64_t len = 0;
8989

9090
if (ReadVarint64(input, &len)) {
9191
if (len > 0x00FFFFFFULL)
9292
return false;
9393

94-
return input->Skip((size_t)len);
94+
return input.Skip((size_t)len);
9595
}
9696

9797
return false;

0 commit comments

Comments
 (0)