diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index df013a0a..e4b0c7ef 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -39,8 +39,11 @@ #define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429 #define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441 #define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442 +#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54448 +#define DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME 54449 +#define DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS 54451 -#define REVISION DBMS_MIN_REVISION_WITH_OPENTELEMETRY +#define REVISION DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS namespace clickhouse { @@ -476,6 +479,22 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { return true; } + case ServerCodes::ProfileEvents: { + if (!WireFormat::SkipString(*input_)) { + return false; + } + + Block block; + if (!ReadBlock(*input_, &block)) { + return false; + } + + if (events_) { + events_->OnProfileEvents(block); + } + return true; + } + default: throw UnimplementedError("unimplemented " + std::to_string((int)packet_type)); break; @@ -649,6 +668,9 @@ void Client::Impl::SendQuery(const Query& query) { WireFormat::WriteString(*output_, info.initial_user); WireFormat::WriteString(*output_, info.initial_query_id); WireFormat::WriteString(*output_, info.initial_address); + if (server_info_.revision >= DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME) { + WireFormat::WriteFixed(*output_, 0); + } WireFormat::WriteFixed(*output_, info.iface_type); WireFormat::WriteString(*output_, info.os_user); @@ -660,6 +682,8 @@ void Client::Impl::SendQuery(const Query& query) { if (server_info_.revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) WireFormat::WriteString(*output_, info.quota_key); + if (server_info_.revision >= DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH) + WireFormat::WriteUInt64(*output_, 0u); if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) { WireFormat::WriteUInt64(*output_, info.client_version_patch); } diff --git a/clickhouse/protocol.h b/clickhouse/protocol.h index bd0ced62..8d361936 100644 --- a/clickhouse/protocol.h +++ b/clickhouse/protocol.h @@ -17,6 +17,11 @@ namespace clickhouse { TablesStatusResponse = 9, /// Response to TableStatus. Log = 10, /// Query execution log. TableColumns = 11, /// Columns' description for default values calculation + PartUUIDs = 12, /// List of unique parts ids. + ReadTaskRequest = 13, /// String (UUID) describes a request for which next task is needed + /// This is such an inverted logic, where server sends requests + /// And client returns back response + ProfileEvents = 14, /// Packet with profile events from server. }; } diff --git a/clickhouse/query.h b/clickhouse/query.h index 4b728e36..21d7231f 100644 --- a/clickhouse/query.h +++ b/clickhouse/query.h @@ -66,6 +66,9 @@ class QueryEvents { */ virtual void OnServerLog(const Block& block) = 0; + /// Handle query execution profile events. + virtual void OnProfileEvents(const Block& block) = 0; + virtual void OnFinish() = 0; }; @@ -75,6 +78,7 @@ using ProgressCallback = std::function; using SelectCallback = std::function; using SelectCancelableCallback = std::function; using SelectServerLogCallback = std::function; +using ProfileEventsCallback = std::function; class Query : public QueryEvents { @@ -148,6 +152,12 @@ class Query : public QueryEvents { return *this; } + /// Set handler for receiving profile events. + inline Query& OnProfileEvents(ProfileEventsCallback cb) { + profile_events_callback_cb_ = std::move(cb); + return *this; + } + static const std::string default_query_id; private: @@ -187,6 +197,12 @@ class Query : public QueryEvents { } } + void OnProfileEvents(const Block& block) override { + if (profile_events_callback_cb_) { + profile_events_callback_cb_(block); + } + } + void OnFinish() override { } @@ -200,6 +216,7 @@ class Query : public QueryEvents { SelectCallback select_cb_; SelectCancelableCallback select_cancelable_cb_; SelectServerLogCallback select_server_log_cb_; + ProfileEventsCallback profile_events_callback_cb_; }; } diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index ff102e1a..6a0af56b 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1126,7 +1126,6 @@ TEST_P(ClientCase, ServerLogs) { EXPECT_GT(received_row_count, 0U); } - TEST_P(ClientCase, TracingContext) { Block block; createTableWithOneColumn(block); @@ -1152,6 +1151,26 @@ TEST_P(ClientCase, TracingContext) { EXPECT_GT(received_rows, 0u); } +TEST_P(ClientCase, OnProfileEvents) { + Block block; + createTableWithOneColumn(block); + + client_->Execute("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')"); + size_t received_row_count = 0; + Query query("SELECT * FROM " + table_name); + + query.OnProfileEvents([&](const Block& block) { + received_row_count += block.GetRowCount(); + return true; + }); + client_->Execute(query); + + const int DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451; + if (client_->GetServerInfo().revision >= DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS) { + EXPECT_GT(received_row_count, 0U); + } +} + const auto LocalHostEndpoint = ClientOptions() .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000"))