Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54448
#define DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME 54449
#define DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS 54451

#define REVISION DBMS_MIN_REVISION_WITH_OPENTELEMETRY
#define REVISION DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS

namespace clickhouse {

Expand Down Expand Up @@ -476,6 +479,22 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
return true;
}

case ServerCodes::ProfileEvents: {
if (!WireFormat::SkipString(*input_)) {
return false;
}

Block block;
if (!ReadBlock(*input_, &block)) {
return false;
}

if (events_) {
events_->OnProfileEvents(block);
}
return true;
}

default:
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
break;
Expand Down Expand Up @@ -649,6 +668,9 @@ void Client::Impl::SendQuery(const Query& query) {
WireFormat::WriteString(*output_, info.initial_user);
WireFormat::WriteString(*output_, info.initial_query_id);
WireFormat::WriteString(*output_, info.initial_address);
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME) {
WireFormat::WriteFixed<int64_t>(*output_, 0);
}
WireFormat::WriteFixed(*output_, info.iface_type);

WireFormat::WriteString(*output_, info.os_user);
Expand All @@ -660,6 +682,8 @@ void Client::Impl::SendQuery(const Query& query) {

if (server_info_.revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
WireFormat::WriteString(*output_, info.quota_key);
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH)
WireFormat::WriteUInt64(*output_, 0u);
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) {
WireFormat::WriteUInt64(*output_, info.client_version_patch);
}
Expand Down
5 changes: 5 additions & 0 deletions clickhouse/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ namespace clickhouse {
TablesStatusResponse = 9, /// Response to TableStatus.
Log = 10, /// Query execution log.
TableColumns = 11, /// Columns' description for default values calculation
PartUUIDs = 12, /// List of unique parts ids.
ReadTaskRequest = 13, /// String (UUID) describes a request for which next task is needed
/// This is such an inverted logic, where server sends requests
/// And client returns back response
ProfileEvents = 14, /// Packet with profile events from server.
};
}

Expand Down
17 changes: 17 additions & 0 deletions clickhouse/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class QueryEvents {
*/
virtual void OnServerLog(const Block& block) = 0;

/// Handle query execution profile events.
virtual void OnProfileEvents(const Block& block) = 0;

virtual void OnFinish() = 0;
};

Expand All @@ -75,6 +78,7 @@ using ProgressCallback = std::function<void(const Progress& progress)>;
using SelectCallback = std::function<void(const Block& block)>;
using SelectCancelableCallback = std::function<bool(const Block& block)>;
using SelectServerLogCallback = std::function<bool(const Block& block)>;
using ProfileEventsCallback = std::function<bool(const Block& block)>;


class Query : public QueryEvents {
Expand Down Expand Up @@ -148,6 +152,12 @@ class Query : public QueryEvents {
return *this;
}

/// Set handler for receiving profile events.
inline Query& OnProfileEvents(ProfileEventsCallback cb) {
profile_events_callback_cb_ = std::move(cb);
return *this;
}

static const std::string default_query_id;

private:
Expand Down Expand Up @@ -187,6 +197,12 @@ class Query : public QueryEvents {
}
}

void OnProfileEvents(const Block& block) override {
if (profile_events_callback_cb_) {
profile_events_callback_cb_(block);
}
}

void OnFinish() override {
}

Expand All @@ -200,6 +216,7 @@ class Query : public QueryEvents {
SelectCallback select_cb_;
SelectCancelableCallback select_cancelable_cb_;
SelectServerLogCallback select_server_log_cb_;
ProfileEventsCallback profile_events_callback_cb_;
};

}
21 changes: 20 additions & 1 deletion ut/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,6 @@ TEST_P(ClientCase, ServerLogs) {
EXPECT_GT(received_row_count, 0U);
}


TEST_P(ClientCase, TracingContext) {
Block block;
createTableWithOneColumn<ColumnString>(block);
Expand All @@ -1152,6 +1151,26 @@ TEST_P(ClientCase, TracingContext) {
EXPECT_GT(received_rows, 0u);
}

TEST_P(ClientCase, OnProfileEvents) {
Block block;
createTableWithOneColumn<ColumnString>(block);

client_->Execute("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')");
size_t received_row_count = 0;
Query query("SELECT * FROM " + table_name);

query.OnProfileEvents([&](const Block& block) {
received_row_count += block.GetRowCount();
return true;
});
client_->Execute(query);

const int DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451;
if (client_->GetServerInfo().revision >= DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS) {
EXPECT_GT(received_row_count, 0U);
}
}

const auto LocalHostEndpoint = ClientOptions()
.SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
.SetPort( getEnvOrDefault<size_t>("CLICKHOUSE_PORT", "9000"))
Expand Down