Adding qid in bolt (#721)

This commit is contained in:
niko4299 2023-01-18 16:33:03 +01:00 committed by GitHub
parent 156e2cd095
commit d9eeedb9ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 38 deletions

View File

@ -146,9 +146,10 @@ QueryData Client::Execute(const std::string &query, const std::map<std::string,
throw ServerMalformedDataException();
}
auto &header = fields.ValueMap();
QueryData ret{{}, std::move(records), std::move(metadata.ValueMap())};
auto &header = fields.ValueMap();
if (header.find("fields") == header.end()) {
throw ServerMalformedDataException();
}
@ -164,6 +165,10 @@ QueryData Client::Execute(const std::string &query, const std::map<std::string,
ret.fields.emplace_back(std::move(field_item.ValueString()));
}
if (header.contains("qid")) {
ret.metadata["qid"] = header["qid"];
}
return ret;
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -73,40 +73,6 @@ inline std::pair<std::string, std::string> ExceptionToErrorMessage(const std::ex
namespace details {
template <typename TSession>
State HandleRun(TSession &session, const State state, const Value &query, const Value &params) {
if (state != State::Idle) {
// Client could potentially recover if we move to error state, but there is
// no legitimate situation in which well working client would end up in this
// situation.
spdlog::trace("Unexpected RUN command!");
return State::Close;
}
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
spdlog::debug("[Run] '{}'", query.ValueString());
try {
// Interpret can throw.
const auto [header, qid] = session.Interpret(query.ValueString(), params.ValueMap());
// Convert std::string to Value
std::vector<Value> vec;
std::map<std::string, Value> data;
vec.reserve(header.size());
for (auto &i : header) vec.emplace_back(std::move(i));
data.emplace("fields", std::move(vec));
// Send the header.
if (!session.encoder_.MessageSuccess(data)) {
spdlog::trace("Couldn't send query header!");
return State::Close;
}
return State::Result;
} catch (const std::exception &e) {
return HandleFailure(session, e);
}
}
template <bool is_pull, typename TSession>
State HandlePullDiscard(TSession &session, std::optional<int> n, std::optional<int> qid) {
try {
@ -229,7 +195,36 @@ State HandleRunV1(TSession &session, const State state, const Marker marker) {
return State::Close;
}
return details::HandleRun(session, state, query, params);
if (state != State::Idle) {
// Client could potentially recover if we move to error state, but there is
// no legitimate situation in which well working client would end up in this
// situation.
spdlog::trace("Unexpected RUN command!");
return State::Close;
}
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
spdlog::debug("[Run] '{}'", query.ValueString());
try {
// Interpret can throw.
const auto [header, qid] = session.Interpret(query.ValueString(), params.ValueMap());
// Convert std::string to Value
std::vector<Value> vec;
std::map<std::string, Value> data;
vec.reserve(header.size());
for (auto &i : header) vec.emplace_back(std::move(i));
data.emplace("fields", std::move(vec));
// Send the header.
if (!session.encoder_.MessageSuccess(data)) {
spdlog::trace("Couldn't send query header!");
return State::Close;
}
return State::Result;
} catch (const std::exception &e) {
return HandleFailure(session, e);
}
}
template <typename TSession>
@ -257,7 +252,40 @@ State HandleRunV4(TSession &session, const State state, const Marker marker) {
spdlog::trace("Couldn't read extra field!");
}
return details::HandleRun(session, state, query, params);
if (state != State::Idle) {
// Client could potentially recover if we move to error state, but there is
// no legitimate situation in which well working client would end up in this
// situation.
spdlog::trace("Unexpected RUN command!");
return State::Close;
}
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
spdlog::debug("[Run] '{}'", query.ValueString());
try {
// Interpret can throw.
const auto [header, qid] = session.Interpret(query.ValueString(), params.ValueMap());
// Convert std::string to Value
std::vector<Value> vec;
std::map<std::string, Value> data;
vec.reserve(header.size());
for (auto &i : header) vec.emplace_back(std::move(i));
data.emplace("fields", std::move(vec));
if (qid.has_value()) {
data.emplace("qid", Value{*qid});
}
// Send the header.
if (!session.encoder_.MessageSuccess(data)) {
spdlog::trace("Couldn't send query header!");
return State::Close;
}
return State::Result;
} catch (const std::exception &e) {
return HandleFailure(session, e);
}
}
template <typename TSession>

View File

@ -45,6 +45,21 @@ class BoltClient : public ::testing::Test {
return true;
}
bool ExecuteAndCheckQid(const std::string &query, int qid, const std::string &message = "") {
try {
auto ret = client_.Execute(query, {});
if (ret.metadata["qid"].ValueInt() != qid) {
return false;
}
} catch (const ClientQueryException &e) {
if (message != "") {
EXPECT_EQ(e.what(), message);
}
throw;
}
return true;
}
int64_t GetCount() {
auto ret = client_.Execute("match (n) return count(n)", {});
EXPECT_EQ(ret.records.size(), 1);
@ -461,6 +476,18 @@ TEST_F(BoltClient, MixedCaseAndWhitespace) {
EXPECT_FALSE(TransactionActive());
}
TEST_F(BoltClient, TestQid) {
for (int i = 0; i < 3; ++i) {
EXPECT_TRUE(Execute("match (n) return count(n)"));
}
EXPECT_TRUE(Execute("begin"));
for (int i = 0; i < 3; ++i) {
EXPECT_TRUE(ExecuteAndCheckQid("match (n) return count(n)", i + 1));
}
EXPECT_TRUE(Execute("commit"));
EXPECT_FALSE(TransactionActive());
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
gflags::ParseCommandLineFlags(&argc, &argv, true);